The branch, master has been updated via 1d5214de08f6716826ed311f1751bf58ebd8c66e (commit) from f51f163d2e8ade102a8a3d7484e0abe22f2ab4f3 (commit) Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below. - Log ----------------------------------------------------------------- commit 1d5214de08f6716826ed311f1751bf58ebd8c66e Author: Mike Gabriel <mike.gabriel@das-netzwerkteam.de> Date: Wed Jun 5 01:26:24 2013 +0200 Add status={S,R} to session profile list items when returned through X2Go Session Broker. (Fixes: #152). Handle taking over of running sessions and resuming sessions more reliably. Provide mechanism to suspend/terminate sessions through X2Go Server's (>= 4.0.0.3) x2gocleansessions daemon. ----------------------------------------------------------------------- Summary of changes: debian/changelog | 4 + debian/control | 2 +- lib/x2gobroker-agent.pl | 12 ++- x2gobroker/agent.py | 86 ++++++++++++++++++++-- x2gobroker/brokers/base_broker.py | 145 +++++++++++++++++++++++-------------- 5 files changed, 184 insertions(+), 65 deletions(-) The diff of changes is: diff --git a/debian/changelog b/debian/changelog index 009de91..9618aec 100644 --- a/debian/changelog +++ b/debian/changelog @@ -9,6 +9,10 @@ x2gobroker (0.0.2.3-0~x2go1) UNRELEASED; urgency=low sessions. - Make enable-plain-output, enable-uccs-output functional. - Add agent-quer-mode »NONE«. Disable X2Go Broker Agent calls completely. + - Add status={S,R} to session profile list items when returned through X2Go + Session Broker. (Fixes: #152). Handle taking over of running sessions and + resuming sessions more reliably. Provide mechanism to suspend/terminate + sessions through X2Go Server's (>= 4.0.0.3) x2gocleansessions daemon. -- Mike Gabriel <mike.gabriel@das-netzwerkteam.de> Wed, 22 May 2013 17:42:12 +0200 diff --git a/debian/control b/debian/control index cfe63c6..a55d7b3 100644 --- a/debian/control +++ b/debian/control @@ -194,7 +194,7 @@ Depends: python-paramiko, perl, adduser, - x2goserver, + x2goserver (>= 4.0.0.3-0~), Description: X2Go http(s) based session broker (common files) X2Go is a server based computing environment with - session resuming diff --git a/lib/x2gobroker-agent.pl b/lib/x2gobroker-agent.pl index 5fc5459..7c80ae4 100755 --- a/lib/x2gobroker-agent.pl +++ b/lib/x2gobroker-agent.pl @@ -182,12 +182,20 @@ if($mode eq 'delauthkey') DelAuthKey($uid, $uidNumber, $gidNumber, $home, $pubkey, $authkeyfile); } -if($mode eq 'suspend') +if($mode eq 'suspendsession') { InitX2GoUser($uid, $uidNumber, $gidNumber, $home); print "OK\n"; my $sid=shift; - exec ("/bin/su - $uid -c \"x2gosuspend-session $sid\""); + exec ("/bin/su - $uid -c \"\$(x2gopath lib)/x2gochangestatus S $sid\""); +} + +if($mode eq 'terminatesession') +{ + InitX2GoUser($uid, $uidNumber, $gidNumber, $home); + print "OK\n"; + my $sid=shift; + exec ("/bin/su - $uid -c \"\$(x2gopath lib)/x2gochangestatus T $sid\""); } if($mode eq 'ping') diff --git a/x2gobroker/agent.py b/x2gobroker/agent.py index b106dd4..e88c805 100644 --- a/x2gobroker/agent.py +++ b/x2gobroker/agent.py @@ -70,7 +70,7 @@ def call_local_broker_agent(username, mode, cmdline_args=[]): ] for cmdline_arg in cmdline_args: - cmd_line.append('"{arg}"'.format(arg=cmdline_arg)) + cmd_line.append('\'{arg}\''.format(arg=cmdline_arg)) logger_broker.debug('Executing agent command locally: {cmd}'.format(cmd=" ".join(cmd_line))) agent_process = subprocess.Popen(cmd_line, @@ -139,6 +139,23 @@ def call_remote_broker_agent(username, mode, cmdline_args=[], remote_agent=None) raise x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException('Query to remote X2Go Broker Agent (user: {user}, hostname: {hostname}, port: {port}) failed'.format(user=remote_username, hostname=remote_hostname, port=remote_port)) +def ping(query_mode='LOCAL', remote_agent=None): + """\ + Ping X2Go Broker Agent. + + @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH}) + @type query_mode: C{unicode} + @param remote_agent: information about the remote agent that is to be called. + @type remote_agent: C{dict} + + """ + username='foo' + if unicode(query_mode).upper() == u'LOCAL': + return call_local_broker_agent(username, mode='ping') + else: + return call_remote_broker_agent(username, mode='ping', remote_agent=remote_agent) + + def list_sessions(username, query_mode='LOCAL', remote_agent=None): """\ Query X2Go Broker Agent for a session list for a given username. @@ -151,12 +168,67 @@ def list_sessions(username, query_mode='LOCAL', remote_agent=None): @type remote_agent: C{dict} """ - if query_mode.upper() == u'LOCAL': + if unicode(query_mode).upper() == u'LOCAL': return call_local_broker_agent(username, mode='listsessions') else: return call_remote_broker_agent(username, mode='listsessions', remote_agent=remote_agent) +def suspend_session(username, session_name, query_mode='LOCAL', remote_agent=None): + """\ + Trigger a session suspensions via the X2Go Broker Agent. + + @param username: suspend the session on behalf of this username + @type username: C{unicode} + @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH}) + @type query_mode: C{unicode} + @param remote_agent: information about the remote agent that is to be called. + @type remote_agent: C{dict} + + """ + if unicode(query_mode).upper() == u'LOCAL': + return call_local_broker_agent(username, mode='suspendsession', cmdline_args=[session_name, ], ) + else: + return call_remote_broker_agent(username, mode='suspendsession', cmdline_args=[session_name, ], remote_agent=remote_agent) + + +def terminate_session(username, session_name, query_mode='LOCAL', remote_agent=None): + """\ + Trigger a session termination via the X2Go Broker Agent. + + @param username: terminate the session on behalf of this username + @type username: C{unicode} + @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH}) + @type query_mode: C{unicode} + @param remote_agent: information about the remote agent that is to be called. + @type remote_agent: C{dict} + + """ + if unicode(query_mode).upper() == u'LOCAL': + return call_local_broker_agent(username, mode='terminatesession', cmdline_args=[session_name, ], ) + else: + return call_remote_broker_agent(username, mode='terminatesession', cmdline_args=[session_name, ], remote_agent=remote_agent) + + +def has_sessions(username, query_mode='LOCAL', remote_agent=None): + """\ + Query X2Go Broker Agent to detect running/suspended sessions on + the remote X2Go Server (farm). + + @param username: run the query on behalf of this username + @type username: C{unicode} + @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH}) + @type query_mode: C{unicode} + @param remote_agent: information about the remote agent that is to be called. + @type remote_agent: C{dict} + + @return: (<has-running-sessions>, <has-suspended-session>), a tuple of two Boolean values + @rtype: C{tuple} + + """ + _session_list = list_sessions(username, query_mode=query_mode, remote_agent=remote_agent) + return ([ s.split('|')[3] for s in _session_list if s.split('|')[4] == 'R' ], [ s.split('|')[3] for s in _session_list if s.split('|')[4] == 'S' ]) + def find_busy_servers(username, query_mode='LOCAL', remote_agent=None): """\ Query X2Go Broker Agent for a list of servers with running @@ -173,7 +245,7 @@ def find_busy_servers(username, query_mode='LOCAL', remote_agent=None): @type remote_agent: C{dict} """ - if query_mode.upper() == u'LOCAL': + if unicode(query_mode).upper() == u'LOCAL': server_list = call_local_broker_agent(username, mode='findbusyservers') else: server_list = call_remote_broker_agent(username, mode='findbusyservers', remote_agent=remote_agent) @@ -205,7 +277,7 @@ def add_authorized_key(username, pubkey_hash, authorized_keys_file='%h/.x2go/aut @type remote_agent: C{dict} """ - if query_mode.upper() == u'LOCAL': + if unicode(query_mode).upper() == u'LOCAL': return call_local_broker_agent(username, mode='addauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ]) else: return call_remote_broker_agent(username, mode='addauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ], remote_agent=remote_agent) @@ -228,7 +300,7 @@ def delete_authorized_key(username, pubkey_hash, authorized_keys_file='%h/.x2go/ """ # this is for the logger output - if remote_agent is None: + if remote_agent in (None, u'LOCAL'): _hostname = 'LOCAL' else: _hostname = remote_agent['hostname'] @@ -237,7 +309,7 @@ def delete_authorized_key(username, pubkey_hash, authorized_keys_file='%h/.x2go/ delayed_execution(delete_authorized_key, delay=delay_deletion, username=username, pubkey_hash=pubkey_hash, authorized_keys_file=authorized_keys_file, query_mode=query_mode, remote_agent=remote_agent, ) logger_broker.debug('Scheduled deletion of authorized key in {delay}s: user={user}, host={host}'.format(delay=delay_deletion, user=username, host=_hostname)) else: - if query_mode.upper() == u'LOCAL': + if unicode(query_mode).upper() == u'LOCAL': return call_local_broker_agent(username, mode='delauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ]) else: return call_remote_broker_agent(username, mode='delauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ], remote_agent=remote_agent) @@ -257,7 +329,7 @@ def get_servers(username, query_mode='LOCAL', remote_agent=None): @type remote_agent: C{dict} """ - if query_mode.upper() == u'LOCAL': + if unicode(query_mode).upper() == u'LOCAL': return call_local_broker_agent(username, mode='getservers') else: return call_local_broker_agent(username, mode='getservers', remote_agent=remote_agent) diff --git a/x2gobroker/brokers/base_broker.py b/x2gobroker/brokers/base_broker.py index 2738105..d29714e 100644 --- a/x2gobroker/brokers/base_broker.py +++ b/x2gobroker/brokers/base_broker.py @@ -30,6 +30,7 @@ import copy import uuid import netaddr import random +import time # X2Go Broker modules import x2gobroker.config @@ -780,6 +781,50 @@ class X2GoBroker(object): except KeyError: return None + + def get_remote_agent(self, profile_id, exclude_agents=[], ): + """\ + Randomly choose a remote agent for agent query. + + @param profile_id: choose remote agent for this profile ID + @type profile_id: C{unicode} + @param exclude_agents: a list of remote agent dict objects to be exclude from the random choice + @type exclude_agents: C{list} + + """ + remote_agent = None + + agent_query_mode = self.get_agent_query_mode().upper() + if agent_query_mode == u'SSH': + + profile = self.get_profile(profile_id) + server_list = profile[u'host'] + random.shuffle(server_list) + + for agent in exclude_agents: + if agent['hostname'] in server_list: + server_list.remove(agent['hostname']) + + while server_list: + + remote_agent_server = server_list[-1] + remote_agent_port = profile[u'sshport'] + remote_agent = {u'hostname': remote_agent_server, u'port': remote_agent_port, } + + if x2gobroker.agent.ping(query_mode=agent_query_mode, remote_agent=remote_agent): + break + + server_list = server_list[0:-1] + + else: + logger_broker.warning('base_broker.X2GoBroker.get_remote_agent(): failed to allocate any broker agent (query-mode: {query_mode}, remote_agent: {remote_agent})'.format(query_mode=agent_query_mode, remote_agent=remote_agent)) + + elif agent_query_mode == u'LOCAL': + # use a non-False value here, not used anywhere else... + remote_agent = 'LOCAL' + + return remote_agent + def list_profiles(self, username): """\ Retrieve a list of available session profiles for the authenticated user. @@ -795,6 +840,10 @@ class X2GoBroker(object): for profile_id in self.get_profile_ids(): profile = self.get_profile(profile_id) + for key in profile.keys(): + if key.startswith('host='): + del profile[key] + if self.use_session_autologin(profile_id): profile['autologin'] = True profile['key'] = 'key-comes-later' @@ -806,34 +855,19 @@ class X2GoBroker(object): acls = self.get_profile_acls(profile_id) if self.check_profile_acls(username, acls): - list_of_profiles.update({profile_id: profile, }) - - return list_of_profiles - def random_remote_agent(self, profile_id, exclude_agents=[]): - """\ - Randomly choose a remote agent for agent query. - - @param profile_id: choose remote agent for this profile ID - @type profile_id: C{unicode} - @param exclude_agents: a list of remote agent dict objects to be exclude from the random choice - @type exclude_agents: C{list} + remote_agent = self.get_remote_agent(profile_id) + agent_query_mode = ( remote_agent == u'LOCAL') and u'LOCAL' or u'SSH' + if remote_agent: + running_sessions, suspended_sessions = x2gobroker.agent.has_sessions(username, query_mode=agent_query_mode, remote_agent=remote_agent) - """ - profile = self.get_profile(profile_id) - server_list = profile[u'host'] + if set(profile['host']).intersection(set(running_sessions)): profile['status'] = u'R' + if set(profile['host']).intersection(set(suspended_sessions)): profile['status'] = u'S' - for agent in exclude_agents: - if agent['hostname'] in server_list: - server_list.remove(agent['hostname']) + profile['host'] = [] + list_of_profiles.update({profile_id: profile, }) - remote_agent = None - if server_list: - random.shuffle(server_list) - remote_agent_server = server_list[0] - remote_agent_port = profile[u'sshport'] - remote_agent = {u'hostname': remote_agent_server, u'port': remote_agent_port, } - return remote_agent + return list_of_profiles def select_session(self, profile_id, username=None): """\ @@ -852,39 +886,24 @@ class X2GoBroker(object): # if we have more than one server, pick one server randomly for X2Go Broker Agent queries server_list = profile[u'host'] - random.shuffle(server_list) - agent_query_mode = self.get_agent_query_mode().upper() + remote_agent = self.get_remote_agent(profile_id) - remote_agent = None - if agent_query_mode == u'SSH': - remote_agent = self.random_remote_agent(profile_id) + if remote_agent: + agent_query_mode = ( remote_agent == u'LOCAL') and u'LOCAL' or u'SSH' # detect best X2Go server for this user if load balancing is configured - if agent_query_mode != u'NONE' and len(server_list) >= 2 and username: + if remote_agent and len(server_list) >= 2 and username: - # query remote agent for session info, if one of the server's is down, we will try the next one... + # query remote agent for session info busy_servers = None - exclude_agents = [] - while busy_servers is None and remote_agent: - - try: - busy_servers = x2gobroker.agent.find_busy_servers(username=username, query_mode=agent_query_mode, remote_agent=remote_agent) - except x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException: - logger_broker.warning('base_broker.X2GoBroker.select_session(): failed to query broker agent (quey-mode: {query_mode}, remote_agent: {remote_agent})'.format(query_mode=agent_query_mode, remote_agent=remote_agent)) - - if agent_query_mode == u'SSH': - # mark this agent as bad - exclude_agents.append(remote_agent) - # also remove this agent from the list of available servers as the machine is probably down - server_list.remove(remote_agent['hostname']) - - remote_agent = self.random_remote_agent(profile_id, exclude_agents=exclude_agents) - else: - remote_agent = None + try: + busy_servers = x2gobroker.agent.find_busy_servers(username=username, query_mode=agent_query_mode, remote_agent=remote_agent) + except x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException: + pass if busy_servers is not None: - # if we do not get until here, then all broker agent calls have failed!!!! + # if we do not get here, we failed to query a valid agent... # when detecting the server load we have to support handling of differing subdomains (config # file vs. server load returned by x2gobroker agent). Best approach: all members of a multi-node @@ -924,7 +943,7 @@ class X2GoBroker(object): best_server = busy_server_list[0][1] else: - logger_broker.warning('base_broker.X2GoBroker.select_session(): all expected broker agents failed to respond, this does not look good. We tried these agent hosts: {agent_hosts}'.format(agent_hosts=unicode(server_list))) + logger_broker.warning('base_broker.X2GoBroker.select_session(): no broker agent could be contacted, this does not look good. We tried these agent hosts: {agent_hosts}'.format(agent_hosts=unicode(server_list))) if server_list: best_server = server_list[0] else: return { 'server': 'no-server-available', 'port': profile[u'sshport'], } @@ -943,16 +962,32 @@ class X2GoBroker(object): 'port': profile[u'sshport'], } - # find already running sessions and resume the first one found - if agent_query_mode != u'NONE' and len(server_list) >= 2 and username: + # find already running/suspended sessions and resume the first one found + if remote_agent and server_list and username: session_list = x2gobroker.agent.list_sessions(username=username, query_mode=agent_query_mode, remote_agent=remote_agent) if session_list: # if resuming, always select the first session in the list, there should only be one suspended session try: - server_name = session_list[0].split('|')[3] - session_info = session_list[0] + running_sessions = [] + suspended_sessions = [] + for session_info in session_list: + if session_info.split('|')[4] == 'R': + running_sessions.append(session_info) + if session_info.split('|')[4] == 'S': + suspended_sessions.append(session_info) + + # we prefer suspended sessions for resuming if we find sessions with both states of activity + if suspended_sessions: + session_info = suspended_sessions[0] + else: + session_info = running_sessions[0] + x2gobroker.agent.suspend_session(username=username, session_name=session_info.split('|')[1], query_mode=agent_query_mode) + # this is the turn-around in x2gocleansessions, so waiting as along as the daemon that will suspend the session + time.sleep(2) + session_info = session_info.replace('|R|', '|S|') + server_name = session_info.split('|')[3] # if we have an explicit IP address for server_name, let's use that instead... try: @@ -972,7 +1007,7 @@ class X2GoBroker(object): # session autologin feature - if agent_query_mode != u'NONE' and self.use_session_autologin(profile_id): + if remote_agent and self.use_session_autologin(profile_id) and username: # FIXME: we somehow have to find out about the username of the person at the broker client-side... # using the username used for broker login for now... hooks/post-receive -- x2gobroker.git (HTTP(S) Session broker for X2Go) This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "x2gobroker.git" (HTTP(S) Session broker for X2Go).