[X2Go-Commits] x2gobroker.git - build-main (branch) updated: 0.0.2.2-9-g1d5214d

X2Go dev team git-admin at x2go.org
Fri Jun 7 23:22:37 CEST 2013


The branch, build-main has been updated
       via  1d5214de08f6716826ed311f1751bf58ebd8c66e (commit)
      from  f51f163d2e8ade102a8a3d7484e0abe22f2ab4f3 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
-----------------------------------------------------------------------

Summary of changes:
 debian/changelog                  |    4 +
 debian/control                    |    2 +-
 lib/x2gobroker-agent.pl           |   12 ++-
 x2gobroker/agent.py               |   86 ++++++++++++++++++++--
 x2gobroker/brokers/base_broker.py |  145 +++++++++++++++++++++++--------------
 5 files changed, 184 insertions(+), 65 deletions(-)

The diff of changes is:
diff --git a/debian/changelog b/debian/changelog
index 009de91..9618aec 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -9,6 +9,10 @@ x2gobroker (0.0.2.3-0~x2go1) UNRELEASED; urgency=low
       sessions.
     - Make enable-plain-output, enable-uccs-output functional.
     - Add agent-quer-mode »NONE«. Disable X2Go Broker Agent calls completely.
+    - Add status={S,R} to session profile list items when returned through X2Go
+      Session Broker. (Fixes: #152). Handle taking over of running sessions and
+      resuming sessions more reliably. Provide mechanism to suspend/terminate
+      sessions through X2Go Server's (>= 4.0.0.3) x2gocleansessions daemon.
 
  -- Mike Gabriel <mike.gabriel at das-netzwerkteam.de>  Wed, 22 May 2013 17:42:12 +0200
 
diff --git a/debian/control b/debian/control
index cfe63c6..a55d7b3 100644
--- a/debian/control
+++ b/debian/control
@@ -194,7 +194,7 @@ Depends:
  python-paramiko,
  perl,
  adduser,
- x2goserver,
+ x2goserver (>= 4.0.0.3-0~),
 Description: X2Go http(s) based session broker (common files)
  X2Go is a server based computing environment with
     - session resuming
diff --git a/lib/x2gobroker-agent.pl b/lib/x2gobroker-agent.pl
index 5fc5459..7c80ae4 100755
--- a/lib/x2gobroker-agent.pl
+++ b/lib/x2gobroker-agent.pl
@@ -182,12 +182,20 @@ if($mode eq 'delauthkey')
 	DelAuthKey($uid, $uidNumber, $gidNumber, $home, $pubkey, $authkeyfile);
 }
 
-if($mode eq 'suspend')
+if($mode eq 'suspendsession')
 {
 	InitX2GoUser($uid, $uidNumber, $gidNumber, $home);
 	print "OK\n";
 	my $sid=shift;
-	exec ("/bin/su - $uid -c \"x2gosuspend-session $sid\"");
+	exec ("/bin/su - $uid -c \"\$(x2gopath lib)/x2gochangestatus S $sid\"");
+}
+
+if($mode eq 'terminatesession')
+{
+	InitX2GoUser($uid, $uidNumber, $gidNumber, $home);
+	print "OK\n";
+	my $sid=shift;
+	exec ("/bin/su - $uid -c \"\$(x2gopath lib)/x2gochangestatus T $sid\"");
 }
 
 if($mode eq 'ping')
diff --git a/x2gobroker/agent.py b/x2gobroker/agent.py
index b106dd4..e88c805 100644
--- a/x2gobroker/agent.py
+++ b/x2gobroker/agent.py
@@ -70,7 +70,7 @@ def call_local_broker_agent(username, mode, cmdline_args=[]):
     ]
 
     for cmdline_arg in cmdline_args:
-        cmd_line.append('"{arg}"'.format(arg=cmdline_arg))
+        cmd_line.append('\'{arg}\''.format(arg=cmdline_arg))
 
     logger_broker.debug('Executing agent command locally: {cmd}'.format(cmd=" ".join(cmd_line)))
     agent_process = subprocess.Popen(cmd_line,
@@ -139,6 +139,23 @@ def call_remote_broker_agent(username, mode, cmdline_args=[], remote_agent=None)
         raise x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException('Query to remote X2Go Broker Agent (user: {user}, hostname: {hostname}, port: {port}) failed'.format(user=remote_username, hostname=remote_hostname, port=remote_port))
 
 
+def ping(query_mode='LOCAL', remote_agent=None):
+    """\
+    Ping X2Go Broker Agent.
+
+    @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH})
+    @type query_mode: C{unicode}
+    @param remote_agent: information about the remote agent that is to be called.
+    @type remote_agent: C{dict}
+
+    """
+    username='foo'
+    if unicode(query_mode).upper() == u'LOCAL':
+        return call_local_broker_agent(username, mode='ping')
+    else:
+        return call_remote_broker_agent(username, mode='ping', remote_agent=remote_agent)
+
+
 def list_sessions(username, query_mode='LOCAL', remote_agent=None):
     """\
     Query X2Go Broker Agent for a session list for a given username.
@@ -151,12 +168,67 @@ def list_sessions(username, query_mode='LOCAL', remote_agent=None):
     @type remote_agent: C{dict}
 
     """
-    if query_mode.upper() == u'LOCAL':
+    if unicode(query_mode).upper() == u'LOCAL':
         return call_local_broker_agent(username, mode='listsessions')
     else:
         return call_remote_broker_agent(username, mode='listsessions', remote_agent=remote_agent)
 
 
+def suspend_session(username, session_name, query_mode='LOCAL', remote_agent=None):
+    """\
+    Trigger a session suspensions via the X2Go Broker Agent.
+
+    @param username: suspend the session on behalf of this username
+    @type username: C{unicode}
+    @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH})
+    @type query_mode: C{unicode}
+    @param remote_agent: information about the remote agent that is to be called.
+    @type remote_agent: C{dict}
+
+    """
+    if unicode(query_mode).upper() == u'LOCAL':
+        return call_local_broker_agent(username, mode='suspendsession', cmdline_args=[session_name, ], )
+    else:
+        return call_remote_broker_agent(username, mode='suspendsession', cmdline_args=[session_name, ], remote_agent=remote_agent)
+
+
+def terminate_session(username, session_name, query_mode='LOCAL', remote_agent=None):
+    """\
+    Trigger a session termination via the X2Go Broker Agent.
+
+    @param username: terminate the session on behalf of this username
+    @type username: C{unicode}
+    @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH})
+    @type query_mode: C{unicode}
+    @param remote_agent: information about the remote agent that is to be called.
+    @type remote_agent: C{dict}
+
+    """
+    if unicode(query_mode).upper() == u'LOCAL':
+        return call_local_broker_agent(username, mode='terminatesession', cmdline_args=[session_name, ], )
+    else:
+        return call_remote_broker_agent(username, mode='terminatesession', cmdline_args=[session_name, ], remote_agent=remote_agent)
+
+
+def has_sessions(username, query_mode='LOCAL', remote_agent=None):
+    """\
+    Query X2Go Broker Agent to detect running/suspended sessions on
+    the remote X2Go Server (farm).
+
+    @param username: run the query on behalf of this username
+    @type username: C{unicode}
+    @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH})
+    @type query_mode: C{unicode}
+    @param remote_agent: information about the remote agent that is to be called.
+    @type remote_agent: C{dict}
+
+    @return: (<has-running-sessions>, <has-suspended-session>), a tuple of two Boolean values
+    @rtype: C{tuple}
+
+    """
+    _session_list = list_sessions(username, query_mode=query_mode, remote_agent=remote_agent)
+    return ([ s.split('|')[3] for s in _session_list if s.split('|')[4] == 'R' ], [ s.split('|')[3] for s in _session_list if s.split('|')[4] == 'S' ])
+
 def find_busy_servers(username, query_mode='LOCAL', remote_agent=None):
     """\
     Query X2Go Broker Agent for a list of  servers with running
@@ -173,7 +245,7 @@ def find_busy_servers(username, query_mode='LOCAL', remote_agent=None):
     @type remote_agent: C{dict}
 
     """
-    if query_mode.upper() == u'LOCAL':
+    if unicode(query_mode).upper() == u'LOCAL':
         server_list = call_local_broker_agent(username, mode='findbusyservers')
     else:
         server_list = call_remote_broker_agent(username, mode='findbusyservers', remote_agent=remote_agent)
@@ -205,7 +277,7 @@ def add_authorized_key(username, pubkey_hash, authorized_keys_file='%h/.x2go/aut
     @type remote_agent: C{dict}
 
     """
-    if query_mode.upper() == u'LOCAL':
+    if unicode(query_mode).upper() == u'LOCAL':
         return call_local_broker_agent(username, mode='addauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ])
     else:
         return call_remote_broker_agent(username, mode='addauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ], remote_agent=remote_agent)
@@ -228,7 +300,7 @@ def delete_authorized_key(username, pubkey_hash, authorized_keys_file='%h/.x2go/
 
     """
     # this is for the logger output
-    if remote_agent is None:
+    if remote_agent in  (None, u'LOCAL'):
         _hostname = 'LOCAL'
     else:
         _hostname = remote_agent['hostname']
@@ -237,7 +309,7 @@ def delete_authorized_key(username, pubkey_hash, authorized_keys_file='%h/.x2go/
         delayed_execution(delete_authorized_key, delay=delay_deletion, username=username, pubkey_hash=pubkey_hash, authorized_keys_file=authorized_keys_file, query_mode=query_mode, remote_agent=remote_agent, )
         logger_broker.debug('Scheduled deletion of authorized key in {delay}s: user={user}, host={host}'.format(delay=delay_deletion, user=username, host=_hostname))
     else:
-        if query_mode.upper() == u'LOCAL':
+        if unicode(query_mode).upper() == u'LOCAL':
             return call_local_broker_agent(username, mode='delauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ])
         else:
             return call_remote_broker_agent(username, mode='delauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ], remote_agent=remote_agent)
@@ -257,7 +329,7 @@ def get_servers(username, query_mode='LOCAL', remote_agent=None):
     @type remote_agent: C{dict}
 
     """
-    if query_mode.upper() == u'LOCAL':
+    if unicode(query_mode).upper() == u'LOCAL':
         return call_local_broker_agent(username, mode='getservers')
     else:
         return call_local_broker_agent(username, mode='getservers', remote_agent=remote_agent)
diff --git a/x2gobroker/brokers/base_broker.py b/x2gobroker/brokers/base_broker.py
index 2738105..d29714e 100644
--- a/x2gobroker/brokers/base_broker.py
+++ b/x2gobroker/brokers/base_broker.py
@@ -30,6 +30,7 @@ import copy
 import uuid
 import netaddr
 import random
+import time
 
 # X2Go Broker modules
 import x2gobroker.config
@@ -780,6 +781,50 @@ class X2GoBroker(object):
         except KeyError:
             return None
 
+
+    def get_remote_agent(self, profile_id, exclude_agents=[], ):
+        """\
+        Randomly choose a remote agent for agent query.
+
+        @param profile_id: choose remote agent for this profile ID
+        @type profile_id: C{unicode}
+        @param exclude_agents: a list of remote agent dict objects to be exclude from the random choice
+        @type exclude_agents: C{list}
+
+        """
+        remote_agent = None
+
+        agent_query_mode = self.get_agent_query_mode().upper()
+        if agent_query_mode == u'SSH':
+
+            profile = self.get_profile(profile_id)
+            server_list = profile[u'host']
+            random.shuffle(server_list)
+
+            for agent in exclude_agents:
+                if agent['hostname'] in server_list:
+                    server_list.remove(agent['hostname'])
+
+            while server_list:
+
+                remote_agent_server = server_list[-1]
+                remote_agent_port = profile[u'sshport']
+                remote_agent = {u'hostname': remote_agent_server, u'port': remote_agent_port, }
+
+                if x2gobroker.agent.ping(query_mode=agent_query_mode, remote_agent=remote_agent):
+                    break
+
+                server_list = server_list[0:-1]
+
+            else:
+                logger_broker.warning('base_broker.X2GoBroker.get_remote_agent(): failed to allocate any broker agent (query-mode: {query_mode}, remote_agent: {remote_agent})'.format(query_mode=agent_query_mode, remote_agent=remote_agent))
+
+        elif agent_query_mode == u'LOCAL':
+            # use a non-False value here, not used anywhere else...
+            remote_agent = 'LOCAL'
+
+        return remote_agent
+
     def list_profiles(self, username):
         """\
         Retrieve a list of available session profiles for the authenticated user.
@@ -795,6 +840,10 @@ class X2GoBroker(object):
         for profile_id in self.get_profile_ids():
             profile = self.get_profile(profile_id)
 
+            for key in profile.keys():
+                if key.startswith('host='):
+                    del profile[key]
+
             if self.use_session_autologin(profile_id):
                 profile['autologin'] = True
                 profile['key'] = 'key-comes-later'
@@ -806,34 +855,19 @@ class X2GoBroker(object):
             acls = self.get_profile_acls(profile_id)
 
             if self.check_profile_acls(username, acls):
-                list_of_profiles.update({profile_id: profile, })
-
-        return list_of_profiles
 
-    def random_remote_agent(self, profile_id, exclude_agents=[]):
-        """\
-        Randomly choose a remote agent for agent query.
-
-        @param profile_id: choose remote agent for this profile ID
-        @type profile_id: C{unicode}
-        @param exclude_agents: a list of remote agent dict objects to be exclude from the random choice
-        @type exclude_agents: C{list}
+                remote_agent = self.get_remote_agent(profile_id)
+                agent_query_mode = ( remote_agent == u'LOCAL') and u'LOCAL' or u'SSH'
+                if remote_agent:
+                    running_sessions, suspended_sessions = x2gobroker.agent.has_sessions(username, query_mode=agent_query_mode, remote_agent=remote_agent)
 
-        """
-        profile = self.get_profile(profile_id)
-        server_list = profile[u'host']
+                    if set(profile['host']).intersection(set(running_sessions)): profile['status'] = u'R'
+                    if set(profile['host']).intersection(set(suspended_sessions)): profile['status'] = u'S'
 
-        for agent in exclude_agents:
-            if agent['hostname'] in server_list:
-                server_list.remove(agent['hostname'])
+                profile['host'] = []
+                list_of_profiles.update({profile_id: profile, })
 
-        remote_agent = None
-        if server_list:
-            random.shuffle(server_list)
-            remote_agent_server = server_list[0]
-            remote_agent_port = profile[u'sshport']
-            remote_agent = {u'hostname': remote_agent_server, u'port': remote_agent_port, }
-        return remote_agent
+        return list_of_profiles
 
     def select_session(self, profile_id, username=None):
         """\
@@ -852,39 +886,24 @@ class X2GoBroker(object):
 
         # if we have more than one server, pick one server randomly for X2Go Broker Agent queries
         server_list = profile[u'host']
-        random.shuffle(server_list)
-        agent_query_mode = self.get_agent_query_mode().upper()
+        remote_agent = self.get_remote_agent(profile_id)
 
-        remote_agent = None
-        if agent_query_mode == u'SSH':
-            remote_agent = self.random_remote_agent(profile_id)
+        if remote_agent:
+            agent_query_mode = ( remote_agent == u'LOCAL') and u'LOCAL' or u'SSH'
 
         # detect best X2Go server for this user if load balancing is configured
-        if agent_query_mode != u'NONE' and len(server_list) >= 2 and username:
+        if remote_agent and len(server_list) >= 2 and username:
 
-            # query remote agent for session info, if one of the server's is down, we will try the next one...
+            # query remote agent for session info
             busy_servers = None
-            exclude_agents = []
-            while busy_servers is None and remote_agent:
-
-                try:
-                    busy_servers = x2gobroker.agent.find_busy_servers(username=username, query_mode=agent_query_mode, remote_agent=remote_agent)
-                except x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException:
-                    logger_broker.warning('base_broker.X2GoBroker.select_session(): failed to query broker agent (quey-mode: {query_mode}, remote_agent: {remote_agent})'.format(query_mode=agent_query_mode, remote_agent=remote_agent))
-
-                    if agent_query_mode == u'SSH':
-                        # mark this agent as bad
-                        exclude_agents.append(remote_agent)
-                        # also remove this agent from the list of available servers as the machine is probably down
-                        server_list.remove(remote_agent['hostname'])
-
-                        remote_agent = self.random_remote_agent(profile_id, exclude_agents=exclude_agents)
-                    else:
-                        remote_agent = None
+            try:
+                busy_servers = x2gobroker.agent.find_busy_servers(username=username, query_mode=agent_query_mode, remote_agent=remote_agent)
+            except x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException:
+                pass
 
             if busy_servers is not None:
 
-                # if we do not get until here, then all broker agent calls have failed!!!!
+                # if we do not get here, we failed to query a valid agent...
 
                 # when detecting the server load we have to support handling of differing subdomains (config
                 # file vs. server load returned by x2gobroker agent). Best approach: all members of a multi-node
@@ -924,7 +943,7 @@ class X2GoBroker(object):
                 best_server = busy_server_list[0][1]
 
             else:
-                logger_broker.warning('base_broker.X2GoBroker.select_session(): all expected broker agents failed to respond, this does not look good. We tried these agent hosts: {agent_hosts}'.format(agent_hosts=unicode(server_list)))
+                logger_broker.warning('base_broker.X2GoBroker.select_session(): no broker agent could be contacted, this does not look good. We tried these agent hosts: {agent_hosts}'.format(agent_hosts=unicode(server_list)))
                 if server_list: best_server = server_list[0]
                 else: return { 'server': 'no-server-available', 'port': profile[u'sshport'], }
 
@@ -943,16 +962,32 @@ class X2GoBroker(object):
             'port': profile[u'sshport'],
         }
 
-        # find already running sessions and resume the first one found
-        if agent_query_mode != u'NONE' and len(server_list) >= 2 and username:
+        # find already running/suspended sessions and resume the first one found
+        if remote_agent and server_list and username:
 
             session_list = x2gobroker.agent.list_sessions(username=username, query_mode=agent_query_mode, remote_agent=remote_agent)
             if session_list:
 
                 # if resuming, always select the first session in the list, there should only be one suspended session
                 try:
-                    server_name = session_list[0].split('|')[3]
-                    session_info = session_list[0]
+                    running_sessions = []
+                    suspended_sessions = []
+                    for session_info in session_list:
+                        if session_info.split('|')[4] == 'R':
+                            running_sessions.append(session_info)
+                        if session_info.split('|')[4] == 'S':
+                            suspended_sessions.append(session_info)
+
+                    # we prefer suspended sessions for resuming if we find sessions with both states of activity
+                    if suspended_sessions:
+                        session_info = suspended_sessions[0]
+                    else:
+                        session_info = running_sessions[0]
+                        x2gobroker.agent.suspend_session(username=username, session_name=session_info.split('|')[1], query_mode=agent_query_mode)
+                        # this is the turn-around in x2gocleansessions, so waiting as along as the daemon that will suspend the session
+                        time.sleep(2)
+                        session_info = session_info.replace('|R|', '|S|')
+                    server_name = session_info.split('|')[3]
 
                     # if we have an explicit IP address for server_name, let's use that instead...
                     try:
@@ -972,7 +1007,7 @@ class X2GoBroker(object):
 
 
         # session autologin feature
-        if agent_query_mode != u'NONE' and self.use_session_autologin(profile_id):
+        if remote_agent and self.use_session_autologin(profile_id) and username:
 
             # FIXME: we somehow have to find out about the username of the person at the broker client-side...
             # using the username used for broker login for now...


hooks/post-receive
-- 
x2gobroker.git (HTTP(S) Session broker for X2Go)

This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "x2gobroker.git" (HTTP(S) Session broker for X2Go).




More information about the x2go-commits mailing list