[X2Go-Commits] x2gobroker.git - build-main (branch) updated: 88bf826ab2efbc3f23da9a42280cad0ee8928aa1
X2Go dev team
git-admin at x2go.org
Sun May 19 13:04:41 CEST 2013
The branch, build-main has been updated
via 88bf826ab2efbc3f23da9a42280cad0ee8928aa1 (commit)
from f0df0578d48cb5eb91bfb1e482d68f4937817fce (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
-----------------------------------------------------------------------
Summary of changes:
README | 1 +
debian/control | 1 +
x2gobroker/brokers/base_broker.py | 79 ++++++-
x2gobroker/tests/test_broker_base.py | 434 ++++++++++++++++++++++++++++++++++
x2gobroker/web/plain.py | 9 +
5 files changed, 521 insertions(+), 3 deletions(-)
The diff of changes is:
diff --git a/README b/README
index 03d87e4..4675864 100644
--- a/README
+++ b/README
@@ -11,6 +11,7 @@ Dependencies:
* This version X2Go Session Broker works with X2Go Client
(>= 4.0.0.0) and Python X2Go (>= 0.5.0.0, not yet available)
as client.
+ * Python modules: python-pampy, python-netaddr
Available features:
diff --git a/debian/control b/debian/control
index 62e4fcf..b70e569 100644
--- a/debian/control
+++ b/debian/control
@@ -112,6 +112,7 @@ Depends:
${misc:Depends},
${python:Depends},
python-pampy,
+ python-netaddr,
Suggests:
apache2 | httpd,
Description: X2Go http(s) based session broker (Python modules)
diff --git a/x2gobroker/brokers/base_broker.py b/x2gobroker/brokers/base_broker.py
index 85984d3..7da6336 100644
--- a/x2gobroker/brokers/base_broker.py
+++ b/x2gobroker/brokers/base_broker.py
@@ -28,6 +28,7 @@ __NAME__ = 'x2gobroker-pylib'
import types
import copy
import uuid
+import netaddr
# X2Go Broker modules
import x2gobroker.config
@@ -61,6 +62,7 @@ class X2GoBroker(object):
self.config = x2gobroker.config.X2GoBrokerConfigFile(config_files=config_file, defaults=config_defaults)
self._dynamic_authid_map = {}
+ self._client_address = None
def __del__(self):
"""\
@@ -76,6 +78,46 @@ class X2GoBroker(object):
"""
return self.config.get_value(self.backend_name, 'enable')
+ def set_client_address(self, address):
+ """\
+ Set the client IP address.
+
+ @param ip: the client IP
+ @type ip: C{unicode}
+
+ """
+ if netaddr.valid_ipv6(address):
+ pass
+ elif netaddr.valid_ipv4(address):
+ pass
+ else:
+ self._client_address = None
+ raise ValueError('address {address} is neither a valid IPv6 nor a valid IPv4 address'.format(address=address))
+ self._client_address = netaddr.IPAddress(address)
+
+ def get_client_address(self):
+ """\
+ Get the client IP address (if set).
+
+ @return the client IP (either IPv4 or IPv6)
+ @rtype ip: C{unicode}
+
+ """
+ if self._client_address:
+ return unicode(self._client_address)
+ else:
+ return None
+
+ def get_client_address_type(self):
+ """\
+ Get the client IP address type of the client address (if set).
+
+ @return the client address type (4: IPv4, 6: IPv6)
+ @rtype ip: C{int}
+
+ """
+ return self._client_address.version
+
def get_global_config(self):
"""\
Get the global section of the configuration file.
@@ -237,9 +279,40 @@ class X2GoBroker(object):
### clients access is granted first, if that fails then we return False here...
- # FIXME: provide code for client based access control
- if not _grant_availability[u'by_client']:
- return False
+ if len( _acls[u'acl-clients-allow'] + _acls[u'acl-clients-deny'] ) > 0:
+
+ _acls_clients_allow = copy.deepcopy(_acls[u'acl-clients-allow'])
+ _acls_clients_deny = copy.deepcopy(_acls[u'acl-clients-deny'])
+
+ _addr = self.get_client_address()
+ _allow_client = False
+ _deny_client = False
+
+ for idx, item in enumerate(_acls_clients_allow):
+ if unicode(item) == u'ALL':
+ _acls_clients_allow[idx] = '0.0.0.0/0'
+ _acls_clients_allow.insert(idx, '::/0')
+ for idx, item in enumerate(_acls_clients_deny):
+ if unicode(item) == u'ALL':
+ _acls_clients_deny[idx] = '0.0.0.0/0'
+ _acls_clients_deny.insert(idx, '::/0')
+
+ _allow_address_set = netaddr.IPSet(_acls_clients_allow)
+ _deny_address_set = netaddr.IPSet(_acls_clients_deny)
+
+ _allow_client = self._client_address in _allow_address_set
+ _deny_client = self._client_address in _deny_address_set
+
+ if not (_allow_client or _deny_client):
+ # client was not in either of the rules, so we presume that the client is allowed to access
+ _grant_availability[u'by_client']
+ if _order[u'clients'] == 'allow-deny':
+ _grant_availability[u'by_client'] = _allow_client and (not _deny_client)
+ else:
+ _grant_availability[u'by_client'] = (not _deny_client) or _allow_client
+
+ if not _grant_availability[u'by_client']:
+ return False
### no user/group ACLs are in use, allow access then...
diff --git a/x2gobroker/tests/test_broker_base.py b/x2gobroker/tests/test_broker_base.py
index bde1538..057647d 100644
--- a/x2gobroker/tests/test_broker_base.py
+++ b/x2gobroker/tests/test_broker_base.py
@@ -738,6 +738,440 @@ enable = true
self.assertEqual(base_backend.check_profile_acls(username_t, acls), True)
self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ def test_clientaddress_recognition(self):
+ base_backend = self._init_base_backend()
+ ipv4_1 = '127.0.0.1'
+ ipv4_2 = '10.0.0.1'
+ ipv4_3 = '123.456.789.101'
+ ipv6_1 = '::1'
+ ipv6_2 = 'fe80::4f8:900:e5d:2'
+ ipv6_3 = 'fe80:0000:0000:0000:04f8:0900:0e5d:0002'
+ ipv6_4 = 'fe80:wxyz:0000:0000:04f8:0900:0e5d:0002'
+ base_backend.set_client_address(ipv4_1)
+ self.assertEqual(base_backend.get_client_address(), ipv4_1)
+ self.assertEqual(base_backend.get_client_address_type(), 4)
+ base_backend.set_client_address(ipv4_2)
+ self.assertEqual(base_backend.get_client_address(), ipv4_2)
+ self.assertEqual(base_backend.get_client_address_type(), 4)
+ self.assertRaises(ValueError, base_backend.set_client_address, ipv4_3)
+ base_backend.set_client_address(ipv6_1)
+ self.assertEqual(base_backend.get_client_address(), ipv6_1)
+ self.assertEqual(base_backend.get_client_address_type(), 6)
+ base_backend.set_client_address(ipv6_2)
+ self.assertEqual(base_backend.get_client_address(), ipv6_2)
+ self.assertEqual(base_backend.get_client_address_type(), 6)
+ base_backend.set_client_address(ipv6_3)
+ self.assertEqual(base_backend.get_client_address(), 'fe80::4f8:900:e5d:2')
+ self.assertEqual(base_backend.get_client_address_type(), 6)
+ self.assertRaises(ValueError, base_backend.set_client_address, (ipv6_4))
+
+ def test_checkprofileacls_clientipv4_simpletests(self):
+ base_backend = self._init_base_backend()
+ username = 'foo'
+ base_backend.set_client_address('10.0.2.14')
+ # no ACLs will grant access
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['ALL'],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['ALL'],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['10.0.2.14'],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['10.0.2.14'],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': ['ALL'],
+ 'acl-clients-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': ['ALL'],
+ 'acl-clients-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': ['10.0.2.14'],
+ 'acl-clients-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': ['10.0.2.14'],
+ 'acl-clients-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+
+ def test_checkprofileacls_clientipv4_combitests(self):
+ base_backend = self._init_base_backend()
+ username = 'foo'
+ ipv4_1 = '10.0.2.14'
+ ipv4_2 = '10.0.3.14'
+ ipv4_3 = '8.8.8.8'
+
+ base_backend.set_client_address(ipv4_1)
+ # no ACLs will grant access
+ acls = {
+ 'acl-clients-allow': ['10.0.2.0/24'],
+ 'acl-clients-deny': ['ALL'],
+ 'acl-clients-order': 'deny-allow',
+ }
+ base_backend.set_client_address(ipv4_1)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ base_backend.set_client_address(ipv4_2)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ base_backend.set_client_address(ipv4_3)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ acls = {
+ 'acl-clients-allow': ['ALL'],
+ 'acl-clients-deny': ['10.0.2.0/24'],
+ 'acl-clients-order': 'allow-deny',
+ }
+ base_backend.set_client_address(ipv4_1)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ base_backend.set_client_address(ipv4_2)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ base_backend.set_client_address(ipv4_3)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['10.0.2.0/24'],
+ 'acl-clients-deny': ['10.0.0.0/16', '10.0.3.0/24'],
+ 'acl-clients-order': 'deny-allow',
+ }
+ base_backend.set_client_address(ipv4_1)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ base_backend.set_client_address(ipv4_2)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ base_backend.set_client_address(ipv4_3)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['10.0.0.0/16', '10.0.3.0/24'],
+ 'acl-clients-deny': ['10.0.2.0/24'],
+ 'acl-clients-order': 'allow-deny',
+ }
+ base_backend.set_client_address(ipv4_1)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ base_backend.set_client_address(ipv4_2)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ base_backend.set_client_address(ipv4_3)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+
+ def test_checkprofileacls_clientipv6_simpletests(self):
+ base_backend = self._init_base_backend()
+ username = 'foo'
+ base_backend.set_client_address('fe80::4f8:900:e5d:2')
+ # no ACLs will grant access
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['ALL'],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['ALL'],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['fe80::4f8:900:e5d:2'],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['fe80::4f8:900:e5d:2'],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': ['ALL'],
+ 'acl-clients-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': ['ALL'],
+ 'acl-clients-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': ['fe80::4f8:900:e5d:2'],
+ 'acl-clients-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ acls = {
+ 'acl-clients-allow': [],
+ 'acl-clients-deny': ['fe80::4f8:900:e5d:2'],
+ 'acl-clients-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+
+ def test_checkprofileacls_clientipv6_combitests(self):
+ base_backend = self._init_base_backend()
+ username = 'foo'
+ ipv6_1 = 'fe80::4f8:900:e5d:2'
+ ipv6_2 = 'fe80::1:4f8:900:e5d:2'
+ ipv6_3 = '2001:1af8:4050::2'
+
+ base_backend.set_client_address(ipv6_1)
+ # no ACLs will grant access
+ acls = {
+ 'acl-clients-allow': ['fe80::/64'],
+ 'acl-clients-deny': ['ALL'],
+ 'acl-clients-order': 'deny-allow',
+ }
+ base_backend.set_client_address(ipv6_1)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ base_backend.set_client_address(ipv6_2)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ base_backend.set_client_address(ipv6_3)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ acls = {
+ 'acl-clients-allow': ['ALL'],
+ 'acl-clients-deny': ['fe80::/64'],
+ 'acl-clients-order': 'allow-deny',
+ }
+ base_backend.set_client_address(ipv6_1)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ base_backend.set_client_address(ipv6_2)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ base_backend.set_client_address(ipv6_3)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['fe80::/64'],
+ 'acl-clients-deny': ['fe80::/56','fe80:0:0:1::/64'],
+ 'acl-clients-order': 'deny-allow',
+ }
+ base_backend.set_client_address(ipv6_1)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ base_backend.set_client_address(ipv6_2)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ base_backend.set_client_address(ipv6_3)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-clients-allow': ['fe80::/56','fe80:0:0:1::/64'],
+ 'acl-clients-deny': ['fe80::/64'],
+ 'acl-clients-order': 'allow-deny',
+ }
+ base_backend.set_client_address(ipv6_1)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ base_backend.set_client_address(ipv6_2)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ base_backend.set_client_address(ipv6_3)
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+
+ def test_checkprofileacls_userandgroupandclient_combitests(self):
+ _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS)
+ _config = """
+[global]
+default-user-db = testsuite
+default-group-db = testsuite
+
+[base]
+enable = true
+"""
+ tf = tempfile.NamedTemporaryFile()
+ print >> tf, _config
+ tf.seek(0)
+ base_backend = base.X2GoBroker(config_file=tf.name, config_defaults=_config_defaults)
+ username_f = 'flip'
+ username_k = 'kassandra'
+ username_m = 'maja'
+ username_t = 'thekla'
+ username_w = 'willi'
+ ipv4_1 = '10.0.2.14'
+ ipv4_2 = '10.0.3.14'
+ ipv4_3 = '8.8.8.8'
+ ipv6_1 = 'fe80::4f8:900:e5d:2'
+ ipv6_2 = 'fe80::1:4f8:900:e5d:2'
+ ipv6_3 = '2001:1af8:4050::2'
+ acls = {
+ 'acl-users-allow': ['flip'],
+ 'acl-users-deny': [],
+ 'acl-users-order': 'deny-allow',
+ 'acl-groups-allow': ['female','male'],
+ 'acl-groups-deny': ['spiders'],
+ 'acl-groups-order': 'deny-allow',
+ 'acl-clients-allow': ['fe80:0:0:1::/64','10.0.3.0/24'],
+ 'acl-clients-deny': ['ALL'],
+ 'acl-clients-order': 'deny-allow',
+ }
+ base_backend.set_client_address(ipv4_1)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ base_backend.set_client_address(ipv4_2)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), True)
+ base_backend.set_client_address(ipv4_3)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ base_backend.set_client_address(ipv6_1)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ base_backend.set_client_address(ipv6_2)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), True)
+ base_backend.set_client_address(ipv6_3)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ acls = {
+ 'acl-users-allow': ['flip'],
+ 'acl-users-deny': [],
+ 'acl-users-order': 'deny-allow',
+ 'acl-groups-allow': ['female','male'],
+ 'acl-groups-deny': ['spiders'],
+ 'acl-groups-order': 'allow-deny',
+ 'acl-clients-allow': ['fe80::/64','10.0.2.0/24'],
+ 'acl-clients-deny': ['ALL'],
+ 'acl-clients-order': 'deny-allow',
+ }
+ base_backend.set_client_address(ipv4_1)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), True)
+ base_backend.set_client_address(ipv4_2)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ base_backend.set_client_address(ipv4_3)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ base_backend.set_client_address(ipv6_1)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), True)
+ base_backend.set_client_address(ipv6_2)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ base_backend.set_client_address(ipv6_3)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ acls = {
+ 'acl-users-allow': [],
+ 'acl-users-deny': [],
+ 'acl-users-order': 'allow-deny',
+ 'acl-groups-allow': ['male','female'],
+ 'acl-groups-deny': ['spiders','grasshoppers'],
+ 'acl-groups-order': 'allow-deny',
+ 'acl-clients-allow': ['ALL'],
+ 'acl-clients-deny': ['fe80::/56','10.0.0.0/8'],
+ 'acl-clients-order': 'allow-deny',
+ }
+ base_backend.set_client_address(ipv4_1)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ base_backend.set_client_address(ipv4_2)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ base_backend.set_client_address(ipv4_3)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), True)
+ base_backend.set_client_address(ipv6_1)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ base_backend.set_client_address(ipv6_2)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), False)
+ base_backend.set_client_address(ipv6_3)
+ self.assertEqual(base_backend.check_profile_acls(username_f, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_k, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_m, acls), True)
+ self.assertEqual(base_backend.check_profile_acls(username_t, acls), False)
+ self.assertEqual(base_backend.check_profile_acls(username_w, acls), True)
+
def test_suite():
from unittest import TestSuite, makeSuite
diff --git a/x2gobroker/web/plain.py b/x2gobroker/web/plain.py
index 49d471e..b6c5f83 100644
--- a/x2gobroker/web/plain.py
+++ b/x2gobroker/web/plain.py
@@ -60,6 +60,15 @@ class X2GoBrokerWebPlain:
global_config = broker_backend.get_global_config()
backend_config = broker_backend.get_backend_config()
+ # set the client address for the broker backend
+ ip = web.ctx.env.get('HTTP_X_FORWARDED_FOR', web.ctx.get('ip', ''))
+ if ip:
+ broker_backend.set_client_address(ip)
+ elif not _X2GOBROKER_DEBUG:
+ # if the client IP is not set, we pretend to have nothing on offer
+ return web.notfound()
+
+ # if the broker backend is disabled in the configuration, pretend to have nothing on offer
if not broker_backend.is_enabled():
return web.notfound()
hooks/post-receive
--
x2gobroker.git (HTTP(S) Session broker for X2Go)
This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "x2gobroker.git" (HTTP(S) Session broker for X2Go).
More information about the x2go-commits
mailing list