[X2Go-Commits] [x2gobroker] 03/03: Don't return X2Go Servers that are actually down, currently.

git-admin at x2go.org git-admin at x2go.org
Tue Dec 9 13:08:42 CET 2014


This is an automated email from the git hooks/post-receive script.

x2go pushed a commit to branch master
in repository x2gobroker.

commit e4be2b012706014d49a330c2a1058f85a5f9977d
Author: Mike Gabriel <mike.gabriel at das-netzwerkteam.de>
Date:   Tue Dec 9 13:08:25 2014 +0100

    Don't return X2Go Servers that are actually down, currently.
    
        The X2Go Servers get probed via a short portscan on the remote's SSH port. If
        that portscan fails, another remote X2Go Server is chosen from the
        list of available server (if any). This portscanning functionality
        can be switched off via "default-portscan-x2goservers" in x2gobroker.conf
        or via "broker-portscan-x2goservers" per session profile. (Fixes:
        #692).
---
 debian/changelog                        |   12 +-
 etc/x2gobroker.conf                     |   13 ++
 x2gobroker/agent.py                     |   58 ++---
 x2gobroker/brokers/base_broker.py       |  363 +++++++++++++++++++------------
 x2gobroker/defaults.py                  |    1 +
 x2gobroker/tests/test_broker_agent.py   |  130 ++++++++++-
 x2gobroker/tests/test_broker_inifile.py |   89 ++++++++
 x2gobroker/utils.py                     |   47 +++-
 8 files changed, 522 insertions(+), 191 deletions(-)

diff --git a/debian/changelog b/debian/changelog
index 02ed4b3..94f5c22 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -29,7 +29,7 @@ x2gobroker (0.0.3.0-0x2go1) UNRELEASED; urgency=low
     - Provide a test function that checks if the basic broker agent setup
       (SSH private/public key pair) is available. If not, no SSH broker
       usage will be attempted.
-    - Let an ICMP ping request precede the SSH ping command. This notably
+    - Let a portscan preceed the SSH ping command. This notably
       reduces timeout duration if the host running the queried broker agent
       is down).
     - Catch RequestHandler errors and write them to the error log channel.
@@ -202,7 +202,15 @@ x2gobroker (0.0.3.0-0x2go1) UNRELEASED; urgency=low
     - Enable basic/random load-balancing for UCCS broker frontend. Make UCCS
       frontend aware of host session profile options of the form
       "host=<fqdn> (<ipaddr>:<port>).
-    - Do an ICMP ping before querying a remote agent via SSH.
+    - Do a portscan on the remote's SSH port before querying a remote agent
+      via SSH.
+    - Don't return X2Go Servers that are actually down, currently. The X2Go
+      Servers get probed via a short portscan on the remote's SSH port. If
+      that portscan fails, another remote X2Go Server is chosen from the
+      list of available server (if any). This portscanning functionality
+      can be switched off via "default-portscan-x2goservers" in x2gobroker.conf
+      or via "broker-portscan-x2goservers" per session profile. (Fixes:
+      #692).
   * debian/control:
     + Provide separate bin:package for SSH brokerage: x2gobroker-ssh.
     + Replace LDAP support with session brokerage support in LONG_DESCRIPTION.
diff --git a/etc/x2gobroker.conf b/etc/x2gobroker.conf
index b6ed8a8..199ff0c 100644
--- a/etc/x2gobroker.conf
+++ b/etc/x2gobroker.conf
@@ -233,6 +233,19 @@
 # below value is the default.
 #default-agent-query-mode=NONE
 
+# Probe SSH port of X2Go Servers (availability check)
+#
+# Just before offering an X2Go Server address to a broker client, the
+# X2Go Broker host can probe the X2Go Server's SSH port. In load balancing
+# setups this assures that the offered X2Go Server is really up and running.
+#
+# However, this requires that the broker host can SSH into the X2Go server
+# (this may not be the case in all thinkable firewall setups).
+#
+# Per default, we set this to "true" here. The portscan feature can be
+# deactivated on a per-session-profile basis (use: broker-portscan-x2goservers =
+# false in the session profile configuration).
+#default-portscan-x2goservers = true
 
 ###
 ### Auth Mechs section
diff --git a/x2gobroker/agent.py b/x2gobroker/agent.py
index 531c407..94c8d1d 100644
--- a/x2gobroker/agent.py
+++ b/x2gobroker/agent.py
@@ -36,6 +36,7 @@ x2gobroker._paramiko.monkey_patch_paramiko()
 # X2Go Broker modules
 import x2gobroker.defaults
 import x2gobroker.x2gobroker_exceptions
+import x2gobroker.utils
 from x2gobroker.loggers import logger_broker, logger_error
 
 tasks = {}
@@ -168,7 +169,12 @@ def _call_remote_broker_agent(username, task, cmdline_args=[], remote_agent=None
         remote_agent['host_key_policy'] = paramiko.WarningPolicy()
 
     remote_hostname = remote_agent[u'hostname']
-    if icmp_ping(remote_hostname):
+    if remote_agent.has_key(u'port'):
+        remote_port = int(remote_agent[u'port'])
+    else:
+        remote_port = 22
+
+    if x2gobroker.utils.portscan(remote_hostname, remote_port):
         cmd_line = [
             '{x2gobroker_agent_binary}'.format(x2gobroker_agent_binary=x2gobroker.defaults.X2GOBROKER_AGENT_CMD),
             '{username}'.format(username=username),
@@ -179,11 +185,6 @@ def _call_remote_broker_agent(username, task, cmdline_args=[], remote_agent=None
             cmd_line.append('"{arg}"'.format(arg=cmdline_arg))
 
         remote_username = x2gobroker.defaults.X2GOBROKER_AGENT_USER
-        if remote_agent.has_key(u'port'):
-            remote_port = int(remote_agent[u'port'])
-        else:
-            remote_port = 22
-
         # now, connect and use paramiko Client to negotiate SSH2 across the connection
         try:
             client = paramiko.SSHClient()
@@ -219,34 +220,6 @@ def _call_remote_broker_agent(username, task, cmdline_args=[], remote_agent=None
         raise x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException('Could not ping remote X2Go Broker Agent host ({remote_hostname})'.format(remote_hostname=remote_hostname))
 
 
-def icmp_ping(hostname):
-    """\
-    Perform an ICMP ping to the requested hostname.
-
-    param hostname: name of the host that shall be pinged
-    type hostname: C{unicode}
-
-    @raise X2GoBrokerAgentException: raised if host is down or unresolvable via DNS.
-
-    """
-    # Detect the IP protocol version...
-    try:
-        dns_query = socket.getaddrinfo(hostname, None, socket.AF_INET6)
-        ping_cmd = "ping6"
-    except socket.gaierror:
-        try:
-            dns_query = socket.getaddrinfo(hostname, None, socket.AF_INET)
-            ping_cmd = "ping"
-        except socket.gaierror:
-            # we can't find a valid address for this host, so returning a failure...
-            return False
-
-    # do the actual ping...
-    if os.system(ping_cmd + " -c 1 -w2 " + hostname + " > /dev/null 2>&1") != 0:
-        return False
-
-    return True
-
 
 def ping(remote_agent=None, **kwargs):
     """\
@@ -261,7 +234,7 @@ def ping(remote_agent=None, **kwargs):
         return _call_local_broker_agent(username)[0]
     else:
         return remote_agent is not None and \
-               icmp_ping(remote_agent['hostname']) and \
+               x2gobroker.utils.portscan(remote_agent['hostname'], remote_agent['port']) and \
                _call_remote_broker_agent(username, task='ping', remote_agent=remote_agent)[0]
 tasks['ping'] = ping
 
@@ -343,17 +316,14 @@ def find_busy_servers(username, remote_agent=None, **kwargs):
     @type remote_agent: C{dict}
 
     """
-    server_list = call_broker_agent(username, task='findbusyservers', remote_agent=remote_agent, **kwargs)[1]
+    _success, server_list = call_broker_agent(username, task='findbusyservers', remote_agent=remote_agent, **kwargs)
 
     server_usage = {}
-
-    if server_list:
-        if type(server_list) is types.ListType:
-            _success = True
-            for server_item in server_list:
-                if ':' in server_item:
-                    usage, server = server_item.split(':')
-                    server_usage.update({ server: int(usage) })
+    if server_list and type(server_list) is types.ListType:
+        for server_item in server_list:
+            if ':' in server_item:
+                usage, server = server_item.split(':')
+                server_usage.update({ server: int(usage) })
     else:
         _success = False
 
diff --git a/x2gobroker/brokers/base_broker.py b/x2gobroker/brokers/base_broker.py
index 81addff..afb8ce2 100644
--- a/x2gobroker/brokers/base_broker.py
+++ b/x2gobroker/brokers/base_broker.py
@@ -581,6 +581,8 @@ class X2GoBroker(object):
         _profile = self.get_profile_broker(profile_id)
         if _profile and _profile.has_key(u'broker-session-autologin') and _profile['broker-session-autologin']:
             _session_autologin = _profile[u'broker-session-autologin']
+            if type(_session_autologin) in (types.StringType, types.UnicodeType):
+                _session_autologin = _session_autologin.lower() in ('1', 'true')
             logger_broker.debug('base_broker.X2GoBroker.get_session_autologin(): found broker-session-autologin in session profile with ID {id}: {value}. This one has precendence over the default value.'.format(id=profile_id, value=_session_autologin))
 
         elif self.config.has_value('global', 'default-session-autologin'):
@@ -591,6 +593,33 @@ class X2GoBroker(object):
     # API compat name:
     use_session_autologin = get_session_autologin
 
+    def get_portscan_x2goservers(self, profile_id):
+        """\
+        Detect if the given profile is configured to try portscanning on X2Go Servers
+        before offering an X2Go Server hostname to the client.
+
+        @return: C{True} if X2Go Servers shall be probed before offering it to clients
+        @rtype: C{bool}
+
+        """
+        _default_portscan_x2goservers = False
+        _portscan_x2goservers = False
+
+        _profile = self.get_profile_broker(profile_id)
+        if _profile and _profile.has_key(u'broker-portscan-x2goservers') and _profile['broker-portscan-x2goservers']:
+            _portscan_x2goservers = _profile[u'broker-portscan-x2goservers']
+            if type(_portscan_x2goservers) in (types.StringType, types.UnicodeType):
+                _portscan_x2goservers = _portscan_x2goservers.lower() in ('1', 'true')
+            logger_broker.debug('base_broker.X2GoBroker.get_portscan_x2goservers(): found broker-portscan-x2goservers in session profile with ID {id}: {value}. This one has precendence over the default value.'.format(id=profile_id, value=_portscan_x2goservers))
+
+        elif self.config.has_value('global', 'default-portscan-x2goservers'):
+            _default_portscan_x2goservers = self.config.get_value('global', 'default-portscan-x2goservers')
+            logger_broker.debug('base_broker.X2GoBroker.get_portscan_x2goservers(): found default-portscan-x2goservers in global config section: {value}'.format(value=_default_portscan_x2goservers))
+
+        return _portscan_x2goservers or _default_portscan_x2goservers
+    # API compat name:
+    use_portscan_x2goservers = get_portscan_x2goservers
+
     def get_authorized_keys_file(self, profile_id):
         """\
         Get the default location of server-side authorized_keys files used with
@@ -992,7 +1021,8 @@ class X2GoBroker(object):
                     if x2gobroker.agent.ping(remote_agent=remote_agent):
                         break
                 except x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException:
-                    remote_agent = None
+                    # at the end of this loop, an empty dict means: no X2Go Server could be contacted!!!
+                    remote_agent = {}
 
                 server_list = server_list[0:-1]
 
@@ -1149,158 +1179,215 @@ class X2GoBroker(object):
                 session_list = []
 
         session_info = None
-        if session_list:
-
-            # Obviously a remote broker agent reported an already running session
-            # on the / on one the available X2Go Server host(s)
 
-            # When resuming, always select the first session in the list,
-            # there should only be one running/suspended session by design
-            # of X2Go brokerage (this may change in the future)
-            try:
-                running_sessions = []
-                suspended_sessions = []
-
-                matching_server_names = x2gobroker.utils.matching_hostnames(server_list, [ si.split('|')[3] for si in session_list ])
-
-                for session_info in session_list:
-                    if session_info.split('|')[3] in matching_server_names:
-                        if session_info.split('|')[4] == 'R':
-                            running_sessions.append(session_info)
-                        if session_info.split('|')[4] == 'S':
-                            suspended_sessions.append(session_info)
-
-                if suspended_sessions or running_sessions:
-                    # we prefer suspended sessions over resuming sessions if we find sessions with both
-                    # states of activity
-                    if suspended_sessions:
-                        session_info = suspended_sessions[0]
-                    elif running_sessions:
-                        session_info = running_sessions[0]
-                        x2gobroker.agent.suspend_session(username=username, session_name=session_info.split('|')[1], remote_agent=remote_agent)
-                        # this is the turn-around in x2gocleansessions, so waiting as along as the daemon
-                        # that will suspend the session
-                        time.sleep(2)
-                        session_info = session_info.replace('|R|', '|S|')
-
-                    # only use the server's official hostname (as set on the server)
-                    # if we have been provided with a physical server address.
-                    # If no physical server address has been provided, we have to use
-                    # the host address as found in server_list (and hope we can connect
-                    # to that address.
-                    _session_server_name = session_info.split('|')[3]
-                    if profile.has_key('host={server_name}'.format(server_name=_session_server_name)):
-                        server_name = _session_server_name
-                    elif _session_server_name in server_list:
-                        server_name = _session_server_name
-                    elif x2gobroker.utils.matching_hostnames(server_list, [_session_server_name]):
-                        for _server_name in server_list:
-                            if _server_name.startswith(_session_server_name):
-                                server_name = _server_name
-                                break
-                    else:
-                        logger_broker.error('base_broker.X2GoBroker.select_session(): configuration error. Hostnames in session profile and actual server names do not match, we won\'t be able to resume/take-over a session this time')
-                        # choosing a random server from the server list, to end up anywhere at least...
-                        server_name = random.choice(server_list)
+        selected_session = {}
+        busy_servers = None
+        _save_server_list = None
+        _save_busy_servers = None
+        initial_server_list = copy.deepcopy(server_list)
+        while not selected_session and server_list:
 
-            except IndexError:
-                # FIXME: if we get here, we have to deal with a broken session info
-                # entry in the X2Go session database. -> AWFUL!!!
-                pass
-
-        # detect best X2Go server for this user if load balancing is configured
-        elif remote_agent and len(server_list) >= 2 and username:
-
-            # No running / suspended session was found on any of the available
-            # X2Go Servers. Thus, we will try to detect the best server for this
-            # load balanced X2Go Server farm.
+            if remote_agent == {}:
 
-            # query remote agent on how busy our servers are...
-            busy_servers = None
-            try:
-                success, busy_servers = x2gobroker.agent.find_busy_servers(username=username, remote_agent=remote_agent)
-            except x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException:
-                pass
+                # we failed to contact any remote agent, so it is very likely, that all servers are down...
+                server_list = []
 
-            if busy_servers is not None:
+            elif session_list:
 
-                # if we do not get here, we failed to query a valid agent...
+                # Obviously a remote broker agent reported an already running session
+                # on the / on one the available X2Go Server host(s)
 
-                # when detecting the server load we have to support handling of differing subdomains (config
-                # file vs. server load returned by x2gobroker agent). Best approach: all members of a multi-node
-                # server farm either
-                #
-                #  (a) do not have a subdomain in their hostname or
-                #  (b) have an identical subdomain in their hostnames
-
-                # Example:
-                #
-                #    ts01, ts02 - hostnames as returned by agent
-                #    ts01.intern, ts02.intern - hostnames configured in session profile option ,,host''
-                #    -> this will result in the subdomain .intern being stripped off from the hostnames before
-                #       detecting the best server for this user
-
-                ### NORMALIZE (=reduce to hostname only) X2Go server names (as found in config) if possible
-                server_list_normalized, subdomains_config = x2gobroker.utils.normalize_hostnames(server_list)
-
-                ### NORMALIZE X2Go server names (as returned by broker agent)--only if the hostnames in
-                # the config share the same subdomain
-                if len(subdomains_config) == 1:
-
-                    busy_servers_normalized, subdomains_agent = x2gobroker.utils.normalize_hostnames(busy_servers)
-                    if len(subdomains_agent) <= 1:
-                        # all X2Go servers in the multi-node server farm are in the same DNS subdomain
-                        # we can operate on hostname-only hostnames
-                        server_list = server_list_normalized
-                        busy_servers = busy_servers_normalized
-
-                for server in server_list:
-                    if server not in busy_servers.keys():
-                        busy_servers[server] = 0
-
-                busy_server_list = [ (load, server) for server, load in busy_servers.items() ]
-                busy_server_list.sort()
-
-                logger_broker.debug('base_broker.X2GoBroker.select_session(): load balancer analysis: {server_load}'.format(server_load=unicode(busy_server_list)))
+                # When resuming, always select the first session in the list,
+                # there should only be one running/suspended session by design
+                # of X2Go brokerage (this may change in the future)
+                try:
+                    running_sessions = []
+                    suspended_sessions = []
+
+                    matching_server_names = x2gobroker.utils.matching_hostnames(server_list, [ si.split('|')[3] for si in session_list ])
+
+                    for session_info in session_list:
+                        if session_info.split('|')[3] in matching_server_names:
+                            if session_info.split('|')[4] == 'R':
+                                running_sessions.append(session_info)
+                            if session_info.split('|')[4] == 'S':
+                                suspended_sessions.append(session_info)
+
+                    if suspended_sessions or running_sessions:
+                        # we prefer suspended sessions over resuming sessions if we find sessions with both
+                        # states of activity
+                        if suspended_sessions:
+                            session_info = suspended_sessions[0]
+                        elif running_sessions:
+                            session_info = running_sessions[0]
+                            x2gobroker.agent.suspend_session(username=username, session_name=session_info.split('|')[1], remote_agent=remote_agent)
+                            # this is the turn-around in x2gocleansessions, so waiting as along as the daemon
+                            # that will suspend the session
+                            time.sleep(2)
+                            session_info = session_info.replace('|R|', '|S|')
+
+                        # only use the server's official hostname (as set on the server)
+                        # if we have been provided with a physical server address.
+                        # If no physical server address has been provided, we have to use
+                        # the host address as found in server_list (and hope we can connect
+                        # to that address.
+                        _session_server_name = session_info.split('|')[3]
+                        if profile.has_key('host={server_name}'.format(server_name=_session_server_name)):
+                            server_name = _session_server_name
+                        elif _session_server_name in server_list:
+                            server_name = _session_server_name
+                        elif x2gobroker.utils.matching_hostnames(server_list, [_session_server_name]):
+                            for _server_name in server_list:
+                                if _server_name.startswith(_session_server_name):
+                                    server_name = _server_name
+                                    break
+                        else:
+                            logger_broker.error('base_broker.X2GoBroker.select_session(): configuration error. Hostnames in session profile and actual server names do not match, we won\'t be able to resume/take-over a session this time')
+                            # choosing a random server from the server list, to end up anywhere at least...
+                            server_name = random.choice(server_list)
+
+                except IndexError:
+                    # FIXME: if we get here, we have to deal with a broken session info
+                    # entry in the X2Go session database. -> AWFUL!!!
+                    pass
 
-                server_name = busy_server_list[0][1]
+            # detect best X2Go server for this user if load balancing is configured
+            elif remote_agent and len(server_list) >= 2 and username:
+
+                # No running / suspended session was found on any of the available
+                # X2Go Servers. Thus, we will try to detect the best server for this
+                # load balanced X2Go Server farm.
+
+                # query remote agent on how busy our servers are... (if a selected server is down
+                # and we come through here again, don't query business state again, use the remembered
+                # status)
+                if busy_servers is None:
+                    try:
+                        success, busy_servers = x2gobroker.agent.find_busy_servers(username=username, remote_agent=remote_agent)
+                    except x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException:
+                        pass
+
+                if busy_servers is not None:
+
+                    # if we do not get here, we failed to query a valid agent...
+
+                    # when detecting the server load we have to support handling of differing subdomains (config
+                    # file vs. server load returned by x2gobroker agent). Best approach: all members of a multi-node
+                    # server farm either
+                    #
+                    #  (a) do not have a subdomain in their hostname or
+                    #  (b) have an identical subdomain in their hostnames
+
+                    # Example:
+                    #
+                    #    ts01, ts02 - hostnames as returned by agent
+                    #    ts01.intern, ts02.intern - hostnames configured in session profile option ,,host''
+                    #    -> this will result in the subdomain .intern being stripped off from the hostnames before
+                    #       detecting the best server for this user
+
+                    ### NORMALIZE (=reduce to hostname only) X2Go server names (as found in config) if possible
+                    server_list_normalized, subdomains_config = x2gobroker.utils.normalize_hostnames(server_list)
+
+                    ### NORMALIZE X2Go server names (as returned by broker agent)--only if the hostnames in
+                    # the config share the same subdomain
+                    if len(subdomains_config) == 1:
+
+                        busy_servers_normalized, subdomains_agent = x2gobroker.utils.normalize_hostnames(busy_servers)
+                        if len(subdomains_agent) <= 1:
+                            # all X2Go servers in the multi-node server farm are in the same DNS subdomain
+                            # we can operate on hostname-only hostnames
+                            _save_server_list = copy.deepcopy(server_list)
+                            _save_busy_servers = copy.deepcopy(busy_servers)
+                            server_list = server_list_normalized
+                            busy_servers = busy_servers_normalized
+
+                    # the list of busy_servers only shows servers with session, but not those servers that are entirely idle...
+                    for server in server_list:
+                        if server not in busy_servers.keys():
+                            busy_servers[server] = 0
+
+                    # we will only contact servers that are (still) in server_list
+                    for busy_server in busy_servers.keys():
+                        if busy_server not in server_list:
+                            del busy_servers[busy_server]
+
+                    busy_server_list = [ (load, server) for server, load in busy_servers.items() ]
+                    busy_server_list.sort()
+
+                    logger_broker.debug('base_broker.X2GoBroker.select_session(): load balancer analysis: {server_load}'.format(server_load=unicode(busy_server_list)))
+
+                    server_name = busy_server_list[0][1]
+
+                    # this makes sure we allow back-translation of hostname to host address
+                    # when the format "<hostname> (<ip-address>)" ist used in the hosts field...
+                    if len(subdomains_config) == 1:
+                        server_name += '.{domain}'.format(domain=subdomains_config[0])
+
+                    if _save_server_list:
+                        server_list = copy.deepcopy(_save_server_list)
+                        _save_server_list = None
+                    if _save_busy_servers:
+                        busy_servers = copy.deepcopy(_save_busy_servers)
+                        _save_busy_servers = None
 
-                # this makes sure we allow back-translation of hostname to host address
-                # when the format "<hostname> (<ip-address>)" ist used in the hosts field...
-                if len(subdomains_config) == 1:
-                    server_name += '.{domain}'.format(domain=subdomains_config[0])
+                else:
+                    logger_broker.warning('base_broker.X2GoBroker.select_session(): no broker agent could be contacted, this does not look good. We tried these agent hosts: {agent_hosts}'.format(agent_hosts=unicode(initial_server_list)))
 
-            else:
-                logger_broker.warning('base_broker.X2GoBroker.select_session(): no broker agent could be contacted, this does not look good. We tried these agent hosts: {agent_hosts}'.format(agent_hosts=unicode(server_list)))
+            # detect best X2Go server for this user if load balancing is configured
+            elif len(server_list) >= 2:
 
-        # detect best X2Go server for this user if load balancing is configured
-        elif len(server_list) >= 2:
+                # no remote broker agent or no username? Let's play roulette then...
+                server_name = random.choice(server_list)
 
-            # no remote broker agent or no username? Let's play roulette then...
-            server_name = random.choice(server_list)
+            ###
+            ### by now we should know the proper host to connect to...
+            ###
 
-        ###
-        ### by now we should know the proper host to connect to...
-        ###
+            server_addr = server_name
+            # if we have an explicit TCP/IP port server_name, let's use that instead...
+            try:
+                server_port = profile['sshport={hostname}'.format(hostname=server_name)]
+                logger_broker.debug('base_broker.X2GoBroker.select_session(): use physical server port: {port}'.format(port=server_port))
+            except KeyError:
+                pass
 
-        # if we have an explicit TCP/IP port server_name, let's use that instead...
-        try:
-            server_port = profile['sshport={hostname}'.format(hostname=server_name)]
-            logger_broker.debug('base_broker.X2GoBroker.select_session(): use physical server port: {port}'.format(port=server_port))
-        except KeyError:
-            pass
+            # if we have an explicit TCP/IP address for server_name, let's use that instead...
+            try:
+                server_addr = profile['host={hostname}'.format(hostname=server_name)]
+                logger_broker.debug('base_broker.X2GoBroker.select_session(): use physical server address: {address}'.format(address=server_addr))
+            except KeyError:
+                pass
 
-        # if we have an explicit TCP/IP address for server_name, let's use that instead...
-        try:
-            server_name = profile['host={hostname}'.format(hostname=server_name)]
-            logger_broker.debug('base_broker.X2GoBroker.select_session(): use physical server address: {address}'.format(address=server_name))
-        except KeyError:
-            pass
+            if server_list:
+                if not self.get_portscan_x2goservers(profile_id) or x2gobroker.utils.portscan(addr=server_name, port=server_port) or x2gobroker.utils.portscan(addr=server_addr, port=server_port):
+                    selected_session = {
+                        'server': server_addr,
+                        'port': server_port,
+                    }
+                else:
+                    server_list.remove(server_name)
 
-        selected_session = {
-            'server': server_name,
-            'port': server_port,
-        }
+                    # pick remaining server from server list (if any)
+                    if server_list:
+                        logger_broker.warning('base_broker.X2GoBroker.select_session(): failed to contact host \'{down_server}\', trying next server \'{next_server}\''.format(down_server=server_name, next_server=server_list[0]))
+                        server_name = server_list[0]
+                    else:
+                        logger_broker.error('base_broker.X2GoBroker.select_session(): no X2Go Server could be contacted, session startup will fail, tried these hosts: {server_list}'.format(server_list=unicode(initial_server_list)))
+
+                    # If we arrive here and session_list carries an entry for this user, then the session DB probably still
+                    # carries a zombie session entry (that will disappear when the down X2Go Server comes up again (cleanup
+                    # via x2gocleansessions).
+                    #
+                    # We have to presume that any running/suspended session we received from the remote agent
+                    # is gone... Let's assign a new session...
+                    session_list = []
+                    session_info = None
+
+        if not selected_session and not server_list:
+            selected_session = {
+                'server': 'no-X2Go-Server-available',
+                'port': server_port,
+            }
 
         # are we resuming a running/suspended session?
         if session_info is not None:
diff --git a/x2gobroker/defaults.py b/x2gobroker/defaults.py
index ef96e82..55cc287 100644
--- a/x2gobroker/defaults.py
+++ b/x2gobroker/defaults.py
@@ -209,6 +209,7 @@ X2GOBROKER_CONFIG_DEFAULTS = {
         u'default-authorized-keys': u'%h/.x2go/authorized_keys',
         u'default-sshproxy-authorized-keys': u'%h/.x2go/authorized_keys',
         u'default-agent-query-mode': u'NONE',
+        u'default-portscan-x2goservers': True,
     },
     'broker_base': {
         u'enable': False,
diff --git a/x2gobroker/tests/test_broker_agent.py b/x2gobroker/tests/test_broker_agent.py
index 22c83f1..2bdab1e 100644
--- a/x2gobroker/tests/test_broker_agent.py
+++ b/x2gobroker/tests/test_broker_agent.py
@@ -30,9 +30,9 @@ class TestX2GoBrokerAgent(unittest.TestCase):
     # TEST INTERPRETATION OF REPLIES FROM (FAKED) BROKER AGENT
 
     def test_broker_agent_replies(self):
-        _save_icmp_ping = x2gobroker.agent.icmp_ping
         _save_local_broker_agent_call = x2gobroker.agent._call_local_broker_agent
         _save_remote_broker_agent_call = x2gobroker.agent._call_remote_broker_agent
+        _save_portscan = x2gobroker.utils.portscan
 
         def _call_testsuite_broker_agent(username, task, cmdline_args=[], remote_agent=None):
 
@@ -64,12 +64,12 @@ class TestX2GoBrokerAgent(unittest.TestCase):
 
             return False, []
 
-        def _fake_icmp_ping(hostname):
+        def _fake_portscan(addr, port=22):
             return True
 
         x2gobroker.agent._call_local_broker_agent = _call_testsuite_broker_agent
         x2gobroker.agent._call_remote_broker_agent = _call_testsuite_broker_agent
-        x2gobroker.agent.icmp_ping = _fake_icmp_ping
+        x2gobroker.utils.portscan = _fake_portscan
 
         _session_profiles = """
 [DEFAULT]
@@ -243,7 +243,129 @@ broker-agent-query-mode = SSH
 
         x2gobroker.agent._call_local_broker_agent = _save_local_broker_agent_call
         x2gobroker.agent._call_remote_broker_agent = _save_remote_broker_agent_call
-        x2gobroker.agent.icmp_ping = _save_icmp_ping
+        x2gobroker.utils.portscan = _save_portscan
+
+    def test_broker_agent_replies_with_offline_servers(self):
+
+        _save_local_broker_agent_call = x2gobroker.agent._call_local_broker_agent
+        _save_remote_broker_agent_call = x2gobroker.agent._call_remote_broker_agent
+        _save_portscan = x2gobroker.utils.portscan
+
+        def _call_testsuite_broker_agent(username, task, cmdline_args=[], remote_agent=None):
+
+            if task == 'listsessions':
+                list_sessions = []
+                if username == 'foo4BS1':
+                    list_sessions = ['30342|foo1S-50-1414759661_stDMATE_dp24|50|host3-with-session|S|2014-10-31T13:47:41|c02c7bbe58677a2726f7e456cb398ae4|127.0.0.1|30001|30002|2014-10-31T13:47:43|foo1S|34|30003|-1|-1',
+                    ]
+                elif username == 'foo4BS2':
+                    list_sessions = ['30342|foo1S-50-1414759661_stDMATE_dp24|50|downhost1-with-session|S|2014-10-31T13:47:41|c02c7bbe58677a2726f7e456cb398ae4|127.0.0.1|30001|30002|2014-10-31T13:47:43|foo1S|34|30003|-1|-1',
+                    ]
+                return True, list_sessions
+
+            elif task == 'suspendsession':
+                return True
+
+            elif task == 'findbusyservers':
+                busy_servers = []
+                if username == 'fooBS1':
+                    busy_servers = [
+                        '7:host1.internal',
+                        '2:host2.internal',
+                        '1:host3.internal',
+                    ]
+                elif username == 'foo4BS1':
+                    busy_servers = [
+                        '2:downhost1-with-session.internal',
+                        '1:host2.internal',
+                        '3:host3-with-session.internal',
+                    ]
+                return True, busy_servers
+
+            return False, []
+
+        def _fake_portscan(addr, port=22):
+            if addr == 'host3.internal':
+                return False
+            if addr.startswith('downhost'):
+                return False
+            return True
+
+        x2gobroker.agent._call_local_broker_agent = _call_testsuite_broker_agent
+        x2gobroker.agent._call_remote_broker_agent = _call_testsuite_broker_agent
+        x2gobroker.utils.portscan = _fake_portscan
+
+        _session_profiles = """
+[DEFAULT]
+command = MATE
+user = foo
+broker-agent-query-mode = NONE
+
+[testprofile1]
+name = testprofile1
+host = host1.internal, host2.internal, host3.internal
+broker-agent-query-mode = LOCAL
+broker-portscan-x2goservers = false
+
+[testprofile2]
+name = testprofile1
+host = host1.internal, host2.internal, host3.internal
+broker-agent-query-mode = LOCAL
+broker-portscan-x2goservers = true
+
+[testprofile3]
+name = testprofile3
+host = downhost1.internal, downhost2.internal, downhost3.internal
+broker-agent-query-mode = LOCAL
+broker-portscan-x2goservers = true
+
+[testprofile4]
+name = testprofile4
+host = downhost1-with-session.internal, host2.internal, host3-with-session.internal
+broker-agent-query-mode = LOCAL
+broker-portscan-x2goservers = true
+
+"""
+        tf = tempfile.NamedTemporaryFile()
+        print >> tf, _session_profiles
+        tf.seek(0)
+        inifile_backend = inifile.X2GoBroker(profile_config_file=tf.name)
+
+        i = 0
+        while i < 10:
+            _session1 = inifile_backend.select_session('testprofile1', username='fooBS1')
+            self.assertTrue ( _session1['server'] == 'host3.internal')
+            i += 1
+
+        i = 0
+        while i < 10:
+            _session2 = inifile_backend.select_session('testprofile2', username='fooBS1')
+            self.assertTrue ( _session2['server'] == 'host2.internal')
+            i += 1
+
+        i = 0
+        while i < 10:
+            _session3 = inifile_backend.select_session('testprofile3', username='fooBS1')
+            self.assertTrue ( _session3['server'] == 'no-X2Go-Server-available')
+            i += 1
+
+        i = 0
+        while i < 10:
+            _session4 = inifile_backend.select_session('testprofile4', username='foo4BS1')
+            self.assertTrue ( _session4['server'] == 'host3-with-session.internal')
+            self.assertTrue ( _session4['session_info'] == '30342|foo1S-50-1414759661_stDMATE_dp24|50|host3-with-session|S|2014-10-31T13:47:41|c02c7bbe58677a2726f7e456cb398ae4|127.0.0.1|30001|30002|2014-10-31T13:47:43|foo1S|34|30003|-1|-1' )
+            i += 1
+
+        i = 0
+        while i < 10:
+            _session4 = inifile_backend.select_session('testprofile4', username='foo4BS2')
+            self.assertTrue ( _session4['server'] == 'host2.internal')
+            self.assertFalse ( _session4.has_key('session_info') )
+            i += 1
+
+        x2gobroker.agent._call_local_broker_agent = _save_local_broker_agent_call
+        x2gobroker.agent._call_remote_broker_agent = _save_remote_broker_agent_call
+        x2gobroker.utils.portscan = _save_portscan
 
 
 def test_suite():
diff --git a/x2gobroker/tests/test_broker_inifile.py b/x2gobroker/tests/test_broker_inifile.py
index 8d86e14..c05aa2e 100644
--- a/x2gobroker/tests/test_broker_inifile.py
+++ b/x2gobroker/tests/test_broker_inifile.py
@@ -462,6 +462,11 @@ acl-users-order = deny-allow
     ### TEST: select_session() method
 
     def test_sessionselection(self):
+        _save_portscan = x2gobroker.utils.portscan
+        def _fake_portscan(addr, port=22):
+            return True
+        x2gobroker.utils.portscan = _fake_portscan
+
         _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS)
         _config = """
 [global]
@@ -546,10 +551,16 @@ host = test-7 (-test-6.extern)
         self.assertEqual(inifile_backend.select_session('testprofile6'), _expected_result_6)
         self.assertEqual(inifile_backend.select_session('testprofile7'), _expected_result_7)
 
+        x2gobroker.utils.portscan = _save_portscan
 
     # TEST MULTI-HOST GET_PROFILE / SELECT_SESSION
 
     def test_multihost_profiles(self):
+        _save_portscan = x2gobroker.utils.portscan
+        def _fake_portscan(addr, port=22):
+            return True
+        x2gobroker.utils.portscan = _fake_portscan
+
         _session_profiles = """
 [DEFAULT]
 command = MATE
@@ -694,6 +705,84 @@ sshport = 22000
             self.assertTrue ( _session8['server'] == 'localhost' )
             i += 1
 
+        x2gobroker.utils.portscan = _save_portscan
+
+    def test_multihost_profiles_with_offline_servers(self):
+        _save_portscan = x2gobroker.utils.portscan
+        def _fake_portscan(addr, port=22):
+            if addr == 'host1.mydomain':
+                return True
+            if addr == 'host2.mydomain':
+                return True
+            if addr == 'host2.internal':
+                return True
+            if addr == 'host3.internal':
+                return True
+            return False
+        x2gobroker.utils.portscan = _fake_portscan
+
+        _session_profiles = """
+[DEFAULT]
+command = MATE
+user = foo
+
+[testprofile1]
+host = host1.mydomain, host2.mydomain, host3.mydomain
+broker-portscan-x2goservers = false
+broker-agent-query-mode = NONE
+
+[testprofile2]
+host = host1.mydomain, host2.mydomain, host3.mydomain
+broker-portscan-x2goservers = true
+broker-agent-query-mode = NONE
+
+[testprofile3]
+host = host1.internal (host1.external), host2.internal (host2.external), host3.internal (host3.external)
+
+"""
+        tf = tempfile.NamedTemporaryFile()
+        print >> tf, _session_profiles
+        tf.seek(0)
+        inifile_backend = inifile.X2GoBroker(profile_config_file=tf.name)
+
+        i = 0
+        selected_hosts = []
+        # we assume that in 1000 test steps we will stumble over all three host names
+        while i < 1000 or len(selected_hosts) != 3:
+            _session1 = inifile_backend.select_session('testprofile1', username='foo')
+            self.assertTrue ( _session1['server'] in ('host1.mydomain', 'host2.mydomain', 'host3.mydomain'))
+            if _session1['server'] not in selected_hosts:
+                selected_hosts.append(_session1['server'])
+            i += 1
+
+        self.assertTrue ( len(selected_hosts) == 3 )
+
+        i = 0
+        selected_hosts = []
+        # we assume that if the code is broken we would receive
+        # the down host (host3.mydomain) within 30 test steps at least once
+        while i <= 30:
+            _session2 = inifile_backend.select_session('testprofile2', username='foo')
+            self.assertTrue ( _session2['server'] in ('host1.mydomain', 'host2.mydomain'))
+            if _session2['server'] not in selected_hosts:
+                selected_hosts.append(_session2['server'])
+            i += 1
+        self.assertTrue ( len(selected_hosts) == 2 )
+
+        i = 0
+        selected_hosts = []
+        # we assume that if the code is broken we would receive
+        # the down host (host1.mydomain/10.0.2.4) within 30 test steps at least once
+        while i <= 30:
+            _session3 = inifile_backend.select_session('testprofile3', username='foo')
+            self.assertTrue ( _session3['server'] in ('host2.external', 'host3.external'))
+            if _session3['server'] not in selected_hosts:
+                selected_hosts.append(_session3['server'])
+            i += 1
+        self.assertTrue ( len(selected_hosts) == 2 )
+
+        x2gobroker.utils.portscan = _save_portscan
+
 
 def test_suite():
     from unittest import TestSuite, makeSuite
diff --git a/x2gobroker/utils.py b/x2gobroker/utils.py
index 002feb5..9d02c47 100644
--- a/x2gobroker/utils.py
+++ b/x2gobroker/utils.py
@@ -24,6 +24,7 @@ import locale
 import netaddr
 import distutils.version
 import pwd, grp
+import socket
 
 def _checkConfigFileDefaults(data_structure):
     """\
@@ -138,9 +139,6 @@ def normalize_hostnames(servers):
             # collect the list of subdomains used in all server names
             if subdomain and subdomain not in subdomains:
                 subdomains.append(subdomain)
-            # stop processing if we have more than one subdomain
-            if len(subdomains) > 1:
-                break
 
     # return the original servers dict/list/tuple
     if len(subdomains) > 1:
@@ -242,3 +240,46 @@ def split_host_address(host, default_address=None, default_port=22):
         bind_address = '[{address}]'.format(address=bind_address)
 
     return bind_address, bind_port
+
+def portscan(addr, port=22):
+    """\
+    Performing a port scan to the requested hostname.
+
+    @param addr: address (IPv4, IPv6 or hostname) of the host
+        we want to probe
+    @type addr: C{unicode}
+    @param port: port number (default: 22)
+    @type addr: C{int}
+
+    """
+    ip_proto = 0
+    try:
+        dns_query = socket.getaddrinfo(addr, None, socket.AF_INET6)
+        ip_proto = 6
+    except socket.gaierror:
+        try:
+            dns_query = socket.getaddrinfo(addr, None, socket.AF_INET)
+            ip_proto = 4
+        except socket.gaierror:
+            # we can't find a valid address for this host, so returning a failure...
+            return False
+
+    if ip_proto == 6 or netaddr.valid_ipv6(addr):
+        sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+    elif ip_proto == 4 or netaddr.valid_ipv4(addr):
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+    sock.settimeout(2)
+    try:
+        result = sock.connect_ex((addr, port))
+        if result !=0:
+            sock.close()
+            return False
+    except socket.gaierror:
+        return False
+    except socket.error:
+       return False
+    finally:
+        sock.close()
+
+    return True

--
Alioth's /srv/git/_hooks_/post-receive-email on /srv/git/code.x2go.org/x2gobroker.git


More information about the x2go-commits mailing list