The branch, master has been updated via 88bf826ab2efbc3f23da9a42280cad0ee8928aa1 (commit) from f0df0578d48cb5eb91bfb1e482d68f4937817fce (commit) Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below. - Log ----------------------------------------------------------------- commit 88bf826ab2efbc3f23da9a42280cad0ee8928aa1 Author: Mike Gabriel <mike.gabriel@das-netzwerkteam.de> Date: Fri Feb 1 10:48:05 2013 +0100 add client address ACL checks ----------------------------------------------------------------------- Summary of changes: README | 1 + debian/control | 1 + x2gobroker/brokers/base_broker.py | 79 ++++++- x2gobroker/tests/test_broker_base.py | 434 ++++++++++++++++++++++++++++++++++ x2gobroker/web/plain.py | 9 + 5 files changed, 521 insertions(+), 3 deletions(-) The diff of changes is: diff --git a/README b/README index 03d87e4..4675864 100644 --- a/README +++ b/README @@ -11,6 +11,7 @@ Dependencies: * This version X2Go Session Broker works with X2Go Client (>= 4.0.0.0) and Python X2Go (>= 0.5.0.0, not yet available) as client. + * Python modules: python-pampy, python-netaddr Available features: diff --git a/debian/control b/debian/control index 62e4fcf..b70e569 100644 --- a/debian/control +++ b/debian/control @@ -112,6 +112,7 @@ Depends: ${misc:Depends}, ${python:Depends}, python-pampy, + python-netaddr, Suggests: apache2 | httpd, Description: X2Go http(s) based session broker (Python modules) diff --git a/x2gobroker/brokers/base_broker.py b/x2gobroker/brokers/base_broker.py index 85984d3..7da6336 100644 --- a/x2gobroker/brokers/base_broker.py +++ b/x2gobroker/brokers/base_broker.py @@ -28,6 +28,7 @@ __NAME__ = 'x2gobroker-pylib' import types import copy import uuid +import netaddr # X2Go Broker modules import x2gobroker.config @@ -61,6 +62,7 @@ class X2GoBroker(object): self.config = x2gobroker.config.X2GoBrokerConfigFile(config_files=config_file, defaults=config_defaults) self._dynamic_authid_map = {} + self._client_address = None def __del__(self): """\ @@ -76,6 +78,46 @@ class X2GoBroker(object): """ return self.config.get_value(self.backend_name, 'enable') + def set_client_address(self, address): + """\ + Set the client IP address. + + @param ip: the client IP + @type ip: C{unicode} + + """ + if netaddr.valid_ipv6(address): + pass + elif netaddr.valid_ipv4(address): + pass + else: + self._client_address = None + raise ValueError('address {address} is neither a valid IPv6 nor a valid IPv4 address'.format(address=address)) + self._client_address = netaddr.IPAddress(address) + + def get_client_address(self): + """\ + Get the client IP address (if set). + + @return the client IP (either IPv4 or IPv6) + @rtype ip: C{unicode} + + """ + if self._client_address: + return unicode(self._client_address) + else: + return None + + def get_client_address_type(self): + """\ + Get the client IP address type of the client address (if set). + + @return the client address type (4: IPv4, 6: IPv6) + @rtype ip: C{int} + + """ + return self._client_address.version + def get_global_config(self): """\ Get the global section of the configuration file. @@ -237,9 +279,40 @@ class X2GoBroker(object): ### clients access is granted first, if that fails then we return False here... - # FIXME: provide code for client based access control - if not _grant_availability[u'by_client']: - return False + if len( _acls[u'acl-clients-allow'] + _acls[u'acl-clients-deny'] ) > 0: + + _acls_clients_allow = copy.deepcopy(_acls[u'acl-clients-allow']) + _acls_clients_deny = copy.deepcopy(_acls[u'acl-clients-deny']) + + _addr = self.get_client_address() + _allow_client = False + _deny_client = False + + for idx, item in enumerate(_acls_clients_allow): + if unicode(item) == u'ALL': + _acls_clients_allow[idx] = '0.0.0.0/0' + _acls_clients_allow.insert(idx, '::/0') + for idx, item in enumerate(_acls_clients_deny): + if unicode(item) == u'ALL': + _acls_clients_deny[idx] = '0.0.0.0/0' + _acls_clients_deny.insert(idx, '::/0') + + _allow_address_set = netaddr.IPSet(_acls_clients_allow) + _deny_address_set = netaddr.IPSet(_acls_clients_deny) + + _allow_client = self._client_address in _allow_address_set + _deny_client = self._client_address in _deny_address_set + + if not (_allow_client or _deny_client): + # client was not in either of the rules, so we presume that the client is allowed to access + _grant_availability[u'by_client'] + if _order[u'clients'] == 'allow-deny': + _grant_availability[u'by_client'] = _allow_client and (not _deny_client) + else: + _grant_availability[u'by_client'] = (not _deny_client) or _allow_client + + if not _grant_availability[u'by_client']: + return False ### no user/group ACLs are in use, allow access then... diff --git a/x2gobroker/tests/test_broker_base.py b/x2gobroker/tests/test_broker_base.py index bde1538..057647d 100644 --- a/x2gobroker/tests/test_broker_base.py +++ b/x2gobroker/tests/test_broker_base.py @@ -738,6 +738,440 @@ enable = true self.assertEqual(base_backend.check_profile_acls(username_t, acls), True) self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + def test_clientaddress_recognition(self): + base_backend = self._init_base_backend() + ipv4_1 = '127.0.0.1' + ipv4_2 = '10.0.0.1' + ipv4_3 = '123.456.789.101' + ipv6_1 = '::1' + ipv6_2 = 'fe80::4f8:900:e5d:2' + ipv6_3 = 'fe80:0000:0000:0000:04f8:0900:0e5d:0002' + ipv6_4 = 'fe80:wxyz:0000:0000:04f8:0900:0e5d:0002' + base_backend.set_client_address(ipv4_1) + self.assertEqual(base_backend.get_client_address(), ipv4_1) + self.assertEqual(base_backend.get_client_address_type(), 4) + base_backend.set_client_address(ipv4_2) + self.assertEqual(base_backend.get_client_address(), ipv4_2) + self.assertEqual(base_backend.get_client_address_type(), 4) + self.assertRaises(ValueError, base_backend.set_client_address, ipv4_3) + base_backend.set_client_address(ipv6_1) + self.assertEqual(base_backend.get_client_address(), ipv6_1) + self.assertEqual(base_backend.get_client_address_type(), 6) + base_backend.set_client_address(ipv6_2) + self.assertEqual(base_backend.get_client_address(), ipv6_2) + self.assertEqual(base_backend.get_client_address_type(), 6) + base_backend.set_client_address(ipv6_3) + self.assertEqual(base_backend.get_client_address(), 'fe80::4f8:900:e5d:2') + self.assertEqual(base_backend.get_client_address_type(), 6) + self.assertRaises(ValueError, base_backend.set_client_address, (ipv6_4)) + + def test_checkprofileacls_clientipv4_simpletests(self): + base_backend = self._init_base_backend() + username = 'foo' + base_backend.set_client_address('10.0.2.14') + # no ACLs will grant access + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': [], + 'acl-clients-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': [], + 'acl-clients-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['ALL'], + 'acl-clients-deny': [], + 'acl-clients-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['ALL'], + 'acl-clients-deny': [], + 'acl-clients-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['10.0.2.14'], + 'acl-clients-deny': [], + 'acl-clients-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['10.0.2.14'], + 'acl-clients-deny': [], + 'acl-clients-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': ['ALL'], + 'acl-clients-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': ['ALL'], + 'acl-clients-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': ['10.0.2.14'], + 'acl-clients-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': ['10.0.2.14'], + 'acl-clients-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + + def test_checkprofileacls_clientipv4_combitests(self): + base_backend = self._init_base_backend() + username = 'foo' + ipv4_1 = '10.0.2.14' + ipv4_2 = '10.0.3.14' + ipv4_3 = '8.8.8.8' + + base_backend.set_client_address(ipv4_1) + # no ACLs will grant access + acls = { + 'acl-clients-allow': ['10.0.2.0/24'], + 'acl-clients-deny': ['ALL'], + 'acl-clients-order': 'deny-allow', + } + base_backend.set_client_address(ipv4_1) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + base_backend.set_client_address(ipv4_2) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + base_backend.set_client_address(ipv4_3) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + acls = { + 'acl-clients-allow': ['ALL'], + 'acl-clients-deny': ['10.0.2.0/24'], + 'acl-clients-order': 'allow-deny', + } + base_backend.set_client_address(ipv4_1) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + base_backend.set_client_address(ipv4_2) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + base_backend.set_client_address(ipv4_3) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['10.0.2.0/24'], + 'acl-clients-deny': ['10.0.0.0/16', '10.0.3.0/24'], + 'acl-clients-order': 'deny-allow', + } + base_backend.set_client_address(ipv4_1) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + base_backend.set_client_address(ipv4_2) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + base_backend.set_client_address(ipv4_3) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['10.0.0.0/16', '10.0.3.0/24'], + 'acl-clients-deny': ['10.0.2.0/24'], + 'acl-clients-order': 'allow-deny', + } + base_backend.set_client_address(ipv4_1) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + base_backend.set_client_address(ipv4_2) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + base_backend.set_client_address(ipv4_3) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + + def test_checkprofileacls_clientipv6_simpletests(self): + base_backend = self._init_base_backend() + username = 'foo' + base_backend.set_client_address('fe80::4f8:900:e5d:2') + # no ACLs will grant access + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': [], + 'acl-clients-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': [], + 'acl-clients-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['ALL'], + 'acl-clients-deny': [], + 'acl-clients-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['ALL'], + 'acl-clients-deny': [], + 'acl-clients-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['fe80::4f8:900:e5d:2'], + 'acl-clients-deny': [], + 'acl-clients-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['fe80::4f8:900:e5d:2'], + 'acl-clients-deny': [], + 'acl-clients-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': ['ALL'], + 'acl-clients-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': ['ALL'], + 'acl-clients-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': ['fe80::4f8:900:e5d:2'], + 'acl-clients-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + acls = { + 'acl-clients-allow': [], + 'acl-clients-deny': ['fe80::4f8:900:e5d:2'], + 'acl-clients-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + + def test_checkprofileacls_clientipv6_combitests(self): + base_backend = self._init_base_backend() + username = 'foo' + ipv6_1 = 'fe80::4f8:900:e5d:2' + ipv6_2 = 'fe80::1:4f8:900:e5d:2' + ipv6_3 = '2001:1af8:4050::2' + + base_backend.set_client_address(ipv6_1) + # no ACLs will grant access + acls = { + 'acl-clients-allow': ['fe80::/64'], + 'acl-clients-deny': ['ALL'], + 'acl-clients-order': 'deny-allow', + } + base_backend.set_client_address(ipv6_1) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + base_backend.set_client_address(ipv6_2) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + base_backend.set_client_address(ipv6_3) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + acls = { + 'acl-clients-allow': ['ALL'], + 'acl-clients-deny': ['fe80::/64'], + 'acl-clients-order': 'allow-deny', + } + base_backend.set_client_address(ipv6_1) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + base_backend.set_client_address(ipv6_2) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + base_backend.set_client_address(ipv6_3) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['fe80::/64'], + 'acl-clients-deny': ['fe80::/56','fe80:0:0:1::/64'], + 'acl-clients-order': 'deny-allow', + } + base_backend.set_client_address(ipv6_1) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + base_backend.set_client_address(ipv6_2) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + base_backend.set_client_address(ipv6_3) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-clients-allow': ['fe80::/56','fe80:0:0:1::/64'], + 'acl-clients-deny': ['fe80::/64'], + 'acl-clients-order': 'allow-deny', + } + base_backend.set_client_address(ipv6_1) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + base_backend.set_client_address(ipv6_2) + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + base_backend.set_client_address(ipv6_3) + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + + def test_checkprofileacls_userandgroupandclient_combitests(self): + _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS) + _config = """ +[global] +default-user-db = testsuite +default-group-db = testsuite + +[base] +enable = true +""" + tf = tempfile.NamedTemporaryFile() + print >> tf, _config + tf.seek(0) + base_backend = base.X2GoBroker(config_file=tf.name, config_defaults=_config_defaults) + username_f = 'flip' + username_k = 'kassandra' + username_m = 'maja' + username_t = 'thekla' + username_w = 'willi' + ipv4_1 = '10.0.2.14' + ipv4_2 = '10.0.3.14' + ipv4_3 = '8.8.8.8' + ipv6_1 = 'fe80::4f8:900:e5d:2' + ipv6_2 = 'fe80::1:4f8:900:e5d:2' + ipv6_3 = '2001:1af8:4050::2' + acls = { + 'acl-users-allow': ['flip'], + 'acl-users-deny': [], + 'acl-users-order': 'deny-allow', + 'acl-groups-allow': ['female','male'], + 'acl-groups-deny': ['spiders'], + 'acl-groups-order': 'deny-allow', + 'acl-clients-allow': ['fe80:0:0:1::/64','10.0.3.0/24'], + 'acl-clients-deny': ['ALL'], + 'acl-clients-order': 'deny-allow', + } + base_backend.set_client_address(ipv4_1) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + base_backend.set_client_address(ipv4_2) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), True) + base_backend.set_client_address(ipv4_3) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + base_backend.set_client_address(ipv6_1) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + base_backend.set_client_address(ipv6_2) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), True) + base_backend.set_client_address(ipv6_3) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + acls = { + 'acl-users-allow': ['flip'], + 'acl-users-deny': [], + 'acl-users-order': 'deny-allow', + 'acl-groups-allow': ['female','male'], + 'acl-groups-deny': ['spiders'], + 'acl-groups-order': 'allow-deny', + 'acl-clients-allow': ['fe80::/64','10.0.2.0/24'], + 'acl-clients-deny': ['ALL'], + 'acl-clients-order': 'deny-allow', + } + base_backend.set_client_address(ipv4_1) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), True) + base_backend.set_client_address(ipv4_2) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + base_backend.set_client_address(ipv4_3) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + base_backend.set_client_address(ipv6_1) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), True) + base_backend.set_client_address(ipv6_2) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + base_backend.set_client_address(ipv6_3) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + acls = { + 'acl-users-allow': [], + 'acl-users-deny': [], + 'acl-users-order': 'allow-deny', + 'acl-groups-allow': ['male','female'], + 'acl-groups-deny': ['spiders','grasshoppers'], + 'acl-groups-order': 'allow-deny', + 'acl-clients-allow': ['ALL'], + 'acl-clients-deny': ['fe80::/56','10.0.0.0/8'], + 'acl-clients-order': 'allow-deny', + } + base_backend.set_client_address(ipv4_1) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + base_backend.set_client_address(ipv4_2) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + base_backend.set_client_address(ipv4_3) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), True) + base_backend.set_client_address(ipv6_1) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + base_backend.set_client_address(ipv6_2) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), False) + base_backend.set_client_address(ipv6_3) + self.assertEqual(base_backend.check_profile_acls(username_f, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_k, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_m, acls), True) + self.assertEqual(base_backend.check_profile_acls(username_t, acls), False) + self.assertEqual(base_backend.check_profile_acls(username_w, acls), True) + def test_suite(): from unittest import TestSuite, makeSuite diff --git a/x2gobroker/web/plain.py b/x2gobroker/web/plain.py index 49d471e..b6c5f83 100644 --- a/x2gobroker/web/plain.py +++ b/x2gobroker/web/plain.py @@ -60,6 +60,15 @@ class X2GoBrokerWebPlain: global_config = broker_backend.get_global_config() backend_config = broker_backend.get_backend_config() + # set the client address for the broker backend + ip = web.ctx.env.get('HTTP_X_FORWARDED_FOR', web.ctx.get('ip', '')) + if ip: + broker_backend.set_client_address(ip) + elif not _X2GOBROKER_DEBUG: + # if the client IP is not set, we pretend to have nothing on offer + return web.notfound() + + # if the broker backend is disabled in the configuration, pretend to have nothing on offer if not broker_backend.is_enabled(): return web.notfound() hooks/post-receive -- x2gobroker.git (HTTP(S) Session broker for X2Go) This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "x2gobroker.git" (HTTP(S) Session broker for X2Go).