The branch, statusflag has been updated via 248929ec32c01181742830159aed013a880eafca (commit) from 4bef261502ad0d4d20751eca0073f5f295aa99ff (commit) Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below. - Log ----------------------------------------------------------------- ----------------------------------------------------------------------- Summary of changes: x2gobroker/backends/base.py | 113 +++++++++++++++++++++++- x2gobroker/backends/inifile.py | 7 ++ x2gobroker/tests/test_backend_base.py | 141 +++++++++++++++++++++++++++++- x2gobroker/tests/test_backend_inifile.py | 2 +- x2gobroker/tests/test_web_plain_base.py | 5 +- 5 files changed, 261 insertions(+), 7 deletions(-) The diff of changes is: diff --git a/x2gobroker/backends/base.py b/x2gobroker/backends/base.py index 19cab3c..93bb288 100644 --- a/x2gobroker/backends/base.py +++ b/x2gobroker/backends/base.py @@ -26,6 +26,7 @@ __NAME__ = 'x2gobroker-pylib' # modules import types +import copy import uuid import pam @@ -125,11 +126,33 @@ class X2GoBroker(object): Get the session profile defaults, i.e. profile options that all configured session profiles have in common. + The defaults are hard-coded in L{x2gobroker.defaults} for class + L{x2gobroker.base.X2GoBroker}. + @return: a dictionary containing the session profile defaults @rtype: C{dict} """ - return {} + profile_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT']) + for key in copy.deepcopy(profile_defaults): + if key.startswith('acl-'): + del profile_defaults[key] + return profile_defaults + + def get_acl_defaults(self): + """\ + Get the ACL defaults for session profiles. The defaults are hard-coded + in L{x2gobroker.defaults} for class L{x2gobroker.base.X2GoBroker}. + + @return: a dictionary containing the ACL defaults for all session profiles + @rtype: C{dict} + + """ + acl_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT']) + for key in copy.deepcopy(acl_defaults): + if not key.startswith('acl-'): + del acl_defaults[key] + return acl_defaults def get_profile(self, profile_id): """\ @@ -157,6 +180,94 @@ class X2GoBroker(object): """ return {} + def check_profile_acls(self, username, acls): + """\ + Test if a given user can get through an ACL check using <acls> as a list + of allow and deny rules. + + @param username: the username of interest + @type username: C{unicode} + @param acls: a dictionary data structure containing ACL information (see L{defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS}) + @type acls: C{dict} + + """ + ### extract ACLs evaluation orders + + _acls = self.get_acl_defaults() + _acls.update(acls) + + _order = {} + _order['users'] = _order['groups'] = _order['clients'] = _acls['acl-any-order'] + + try: _order['users'] = _acls['acl-users-order'] + except KeyError: pass + try: _order['groups'] = _acls['acl-groups-order'] + except KeyError: pass + try: _order['clients'] = _acls['acl-clients-order'] + except KeyError: pass + + # to pass an ACL test, all three keys in the dict below have to be set to True + # if one stays False, the related session profile will not be returned to the querying + # X2Go client... + _grant_availability = { + 'by_user': False, + # FIXME: leaving the group access to False for now, we need methods that give us a generic + # acces to the list of groups a user belongs to. One generic access is asking libnss, but: + # are there others? + 'by_group': False, + # FIXME: set the client access to True for now as we have not a way to check that available... + 'by_client': True, + } + + ### CHECKING on a per-user basis... + + _allow_user = False + _explicit_allow_user = False + _deny_user = False + _explicit_deny_user = False + + if (username in _acls['acl-users-allow']) or ('ALL' in _acls['acl-users-allow']): + _allow_user = True + + if username in _acls['acl-users-deny']: + _explicit_deny_user = True + if _explicit_deny_user or ('ALL' in _acls['acl-users-deny']): + _deny_user = True + + if _order['users'] == 'allow-deny': + _grant_availability['by_user'] = _allow_user and not _deny_user + else: + _grant_availability['by_user'] = not _deny_user or _allow_user + + ### CHECKING on a per-group basis... + + # FIXME: todo + + ### CHECKING on a per-client basis... + + # FIXME: todo + + ### clients access is granted first, if that fails then we return False here... + + if not _grant_availability['by_client']: + return False + + ### now let's deal out the several combinations of user and group ACLs... + ### + + # if a user has been granted access directly, then the corresponding session profile + # will be provided to him/her. + if _grant_availability['by_user']: + return True + + # if a group has been granted access, with one exception: if the thread model for users is + # allow-deny, then we presume that the acl-users-deny entry has precendence over + # acl-groups-allow/acl-groups-deny. + if _grant_availability['by_group'] and not _explicit_deny_user: + return True + + return False + def test_connection(self): #if($cgi->param('task') eq 'testcon') #{ diff --git a/x2gobroker/backends/inifile.py b/x2gobroker/backends/inifile.py index 424634a..1c166c8 100644 --- a/x2gobroker/backends/inifile.py +++ b/x2gobroker/backends/inifile.py @@ -85,6 +85,13 @@ class X2GoBroker(base.X2GoBroker): def list_profiles(self, username): list_of_profiles = {} + for profile_id in self.get_profile_ids(): + profile = self.get_profile(profile_id) + acls = self.get_profile_acls(profile_id) + + if self.check_acls(username, acls): + list_of_profiles.update(profile) + return list_of_profiles def select_profile(self, profile_name): diff --git a/x2gobroker/tests/test_backend_base.py b/x2gobroker/tests/test_backend_base.py index 8f26d38..b629156 100644 --- a/x2gobroker/tests/test_backend_base.py +++ b/x2gobroker/tests/test_backend_base.py @@ -19,6 +19,7 @@ import unittest import tempfile +import copy # Python X2GoBroker modules import x2gobroker.backends.base @@ -26,10 +27,19 @@ import x2gobroker.defaults class TestX2GoBrokerBackendBase(unittest.TestCase): + def _init_base_backend(self): + _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS) + _config_defaults.update({'base': {'enable': True, }, }) + _config = "" # use the config that derives directly from the config defaults + tf = tempfile.NamedTemporaryFile() + print >> tf, _config + tf.seek(0) + return x2gobroker.backends.base.X2GoBroker(config_file=tf.name, config_defaults=_config_defaults) + ### TEST CONFIGURATION: <backend> >> enable = true|false def test_is_enabled(self): - _config_defaults = x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS + _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS) _config_defaults.update({'base': {'enable': True, }, }) _config = """ [base] @@ -54,7 +64,7 @@ enable = true ### TEST CONFIGURATION: global >> check-credentials = false def test_check_access_nocreds(self): - _config_defaults = x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS + _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS) _config_defaults.update({'base': {'enable': True, }, }) _config = """ [global] @@ -67,6 +77,133 @@ check-credentials = false self.assertEqual(base_backend.check_access(), True) tf.close() + ### TEST PROFILE DEFAULTS: get_profile_defaults() + + def test_getdefaultprofile(self): + base_backend = self._init_base_backend() + _expected_profile = { + 'defsndport': True, + 'useiconv': False, + 'iconvfrom': 'UTF-8', + 'height': 600, + 'export': '', + 'quality': 9, + 'fullscreen': False, + 'layout': '', + 'useexports': True, + 'width': 800, + 'speed': 2, + 'soundsystem': 'pulse', + 'print': True, + 'type': 'auto', + 'sndport': 4713, + 'xinerama': True, + 'variant': '', + 'usekbd': True, + 'fstunnel': True, + 'applications': ['TERMINAL','WWWBROWSER','MAILCLIENT','OFFICE'], + 'multidisp': False, + 'sshproxyport': 22, + 'sound': True, + 'rootless': False, + 'iconvto': 'UTF-8', + 'soundtunnel': True, + 'dpi': 96, + 'sshport': 22, + 'setdpi': 0, + 'pack': '16m-jpeg', + } + _profile = base_backend.get_profile_defaults() + self.assertEqual(len(_expected_profile.keys()), len(_profile.keys())) + for key in _expected_profile.keys(): + self.assertTrue( ( key in _profile.keys() ) ) + for key in _profile.keys(): + self.assertTrue( ( key in _expected_profile.keys() and _profile[key] == _expected_profile[key] ) ) + + ### TEST ACL DEFAULTS: get_acl_defaults() + + def test_getdefaultacls(self): + base_backend = self._init_base_backend() + _expected_acls = { + 'acl-users-allow': ['ALL'], + 'acl-users-deny': [], + 'acl-users-order': '', + 'acl-groups-allow': ['ALL'], + 'acl-groups-deny': [], + 'acl-groups-order': '', + 'acl-clients-allow': ['ALL'], + 'acl-clients-deny': [], + 'acl-clients-order': '', + 'acl-any-order': 'deny-allow', + } + _acls = base_backend.get_acl_defaults() + self.assertEqual(len(_expected_acls.keys()), len(_acls.keys())) + for key in _expected_acls.keys(): + self.assertTrue( ( key in _acls.keys() ) ) + for key in _acls.keys(): + self.assertTrue( ( key in _expected_acls.keys() and _acls[key] == _expected_acls[key] ) ) + + ### TEST ACL CHECK: check_profile_acls() + + def test_checkprofileacls_simpletests(self): + base_backend = self._init_base_backend() + username = 'foo' + acls = { + 'acl-users-allow': ['ALL'], + 'acl-users-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-users-allow': ['foo'], + 'acl-users-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-users-allow': ['ALL'], + 'acl-users-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-users-allow': [], + 'acl-users-deny': ['ALL'], + 'acl-users-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + acls = { + 'acl-users-allow': [], + 'acl-users-deny': ['ALL'], + 'acl-users-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + + def test_checkprofileacls_usercombitests(self): + base_backend = self._init_base_backend() + username = 'foo' + acls = { + 'acl-users-allow': ['ALL'], + 'acl-users-deny': ['foo'], + 'acl-users-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + acls = { + 'acl-users-allow': ['foo'], + 'acl-users-deny': ['ALL'], + 'acl-users-order': 'allow-deny', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), False) + acls = { + 'acl-users-deny': ['ALL'], + 'acl-users-allow': ['foo'], + 'acl-users-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + acls = { + 'acl-users-deny': ['foo'], + 'acl-users-allow': ['ALL'], + 'acl-users-order': 'deny-allow', + } + self.assertEqual(base_backend.check_profile_acls(username, acls), True) + def test_suite(): from unittest import TestSuite, makeSuite suite = TestSuite() diff --git a/x2gobroker/tests/test_backend_inifile.py b/x2gobroker/tests/test_backend_inifile.py index bec9381..9a92730 100644 --- a/x2gobroker/tests/test_backend_inifile.py +++ b/x2gobroker/tests/test_backend_inifile.py @@ -44,7 +44,7 @@ class TestX2GoBrokerBackendInifile(unittest.TestCase): def test_getprofiledefaults(self): inifile_backend = x2gobroker.backends.inifile.X2GoBroker(profile_config_file='../../etc/x2gobroker-sessionprofiles.conf') _profile_defaults = inifile_backend.get_profile_defaults() - _expected_profile_defaults = x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT'] + _expected_profile_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT']) for key in copy.deepcopy(_expected_profile_defaults): if key.startswith('acl-'): del _expected_profile_defaults[key] diff --git a/x2gobroker/tests/test_web_plain_base.py b/x2gobroker/tests/test_web_plain_base.py index 29cd656..25530c4 100644 --- a/x2gobroker/tests/test_web_plain_base.py +++ b/x2gobroker/tests/test_web_plain_base.py @@ -22,9 +22,7 @@ import tempfile from paste.fixture import TestApp from paste.request import EnvironHeaders from nose.tools import * - -# remove later, for debugging -import subprocess +import copy # Python X2GoBroker modules import x2gobroker.backends.base @@ -115,6 +113,7 @@ auth-mech = testsuite def test_suite(): + x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS.update({'base': {'enable': True, 'auth-mech': 'pam', }, }) from unittest import TestSuite, makeSuite suite = TestSuite() suite.addTest(makeSuite(TestX2GoBrokerWebPlainBase)) hooks/post-receive -- x2gobroker.git (HTTP(S) Session broker for X2Go) This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "x2gobroker.git" (HTTP(S) Session broker for X2Go).