[X2Go-Commits] x2gobroker.git - master-inai (branch) updated: 248929ec32c01181742830159aed013a880eafca

X2Go dev team git-admin at x2go.org
Tue Apr 23 21:08:20 CEST 2013


The branch, master-inai has been updated
       via  248929ec32c01181742830159aed013a880eafca (commit)
      from  4bef261502ad0d4d20751eca0073f5f295aa99ff (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
-----------------------------------------------------------------------

Summary of changes:
 x2gobroker/backends/base.py              |  113 +++++++++++++++++++++++-
 x2gobroker/backends/inifile.py           |    7 ++
 x2gobroker/tests/test_backend_base.py    |  141 +++++++++++++++++++++++++++++-
 x2gobroker/tests/test_backend_inifile.py |    2 +-
 x2gobroker/tests/test_web_plain_base.py  |    5 +-
 5 files changed, 261 insertions(+), 7 deletions(-)

The diff of changes is:
diff --git a/x2gobroker/backends/base.py b/x2gobroker/backends/base.py
index 19cab3c..93bb288 100644
--- a/x2gobroker/backends/base.py
+++ b/x2gobroker/backends/base.py
@@ -26,6 +26,7 @@ __NAME__ = 'x2gobroker-pylib'
 
 # modules
 import types
+import copy
 import uuid
 import pam
 
@@ -125,11 +126,33 @@ class X2GoBroker(object):
         Get the session profile defaults, i.e. profile options that all
         configured session profiles have in common.
 
+        The defaults are hard-coded in L{x2gobroker.defaults} for class
+        L{x2gobroker.base.X2GoBroker}.
+
         @return: a dictionary containing the session profile defaults
         @rtype: C{dict}
 
         """
-        return {}
+        profile_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT'])
+        for key in copy.deepcopy(profile_defaults):
+            if key.startswith('acl-'):
+                del profile_defaults[key]
+        return profile_defaults
+
+    def get_acl_defaults(self):
+        """\
+        Get the ACL defaults for session profiles. The defaults are hard-coded
+        in L{x2gobroker.defaults} for class L{x2gobroker.base.X2GoBroker}.
+
+        @return: a dictionary containing the ACL defaults for all session profiles
+        @rtype: C{dict}
+
+        """
+        acl_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT'])
+        for key in copy.deepcopy(acl_defaults):
+            if not key.startswith('acl-'):
+                del acl_defaults[key]
+        return acl_defaults
 
     def get_profile(self, profile_id):
         """\
@@ -157,6 +180,94 @@ class X2GoBroker(object):
         """
         return {}
 
+    def check_profile_acls(self, username, acls):
+        """\
+        Test if a given user can get through an ACL check using <acls> as a list
+        of allow and deny rules.
+
+        @param username: the username of interest
+        @type username: C{unicode}
+        @param acls: a dictionary data structure containing ACL information (see L{defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS})
+        @type acls: C{dict}
+
+        """
+        ### extract ACLs evaluation orders
+
+        _acls = self.get_acl_defaults()
+        _acls.update(acls)
+
+        _order = {}
+        _order['users'] = _order['groups'] = _order['clients'] = _acls['acl-any-order']
+
+        try: _order['users'] = _acls['acl-users-order']
+        except KeyError: pass
+        try: _order['groups'] = _acls['acl-groups-order']
+        except KeyError: pass
+        try: _order['clients'] = _acls['acl-clients-order']
+        except KeyError: pass
+
+        # to pass an ACL test, all three keys in the dict below have to be set to True
+        # if one stays False, the related session profile will not be returned to the querying
+        # X2Go client...
+        _grant_availability = {
+            'by_user': False,
+            # FIXME: leaving the group access to False for now, we need methods that give us a generic
+            # acces to the list of groups a user belongs to. One generic access is asking libnss, but:
+            # are there others?
+            'by_group': False,
+            # FIXME: set the client access to True for now as we have not a way to check that available...
+            'by_client': True,
+        }
+
+        ### CHECKING on a per-user basis...
+
+        _allow_user = False
+        _explicit_allow_user = False
+        _deny_user = False
+        _explicit_deny_user = False
+
+        if (username in _acls['acl-users-allow']) or ('ALL' in _acls['acl-users-allow']):
+            _allow_user = True
+
+        if username in _acls['acl-users-deny']:
+            _explicit_deny_user = True
+        if _explicit_deny_user or ('ALL' in _acls['acl-users-deny']):
+            _deny_user = True
+
+        if _order['users'] == 'allow-deny':
+            _grant_availability['by_user'] = _allow_user and not _deny_user
+        else:
+            _grant_availability['by_user'] = not _deny_user or _allow_user
+
+        ### CHECKING on a per-group basis...
+
+        # FIXME: todo
+
+        ### CHECKING on a per-client basis...
+
+        # FIXME: todo
+
+        ### clients access is granted first, if that fails then we return False here...
+
+        if not _grant_availability['by_client']:
+            return False
+
+        ### now let's deal out the several combinations of user and group ACLs...
+        ###
+
+        # if a user has been granted access directly, then the corresponding session profile
+        # will be provided to him/her.
+        if _grant_availability['by_user']:
+            return True
+
+        # if a group has been granted access, with one exception: if the thread model for users is
+        # allow-deny, then we presume that the acl-users-deny entry has precendence over
+        # acl-groups-allow/acl-groups-deny.
+        if _grant_availability['by_group'] and not _explicit_deny_user:
+            return True
+
+        return False
+
     def test_connection(self):
         #if($cgi->param('task') eq 'testcon')
         #{
diff --git a/x2gobroker/backends/inifile.py b/x2gobroker/backends/inifile.py
index 424634a..1c166c8 100644
--- a/x2gobroker/backends/inifile.py
+++ b/x2gobroker/backends/inifile.py
@@ -85,6 +85,13 @@ class X2GoBroker(base.X2GoBroker):
     def list_profiles(self, username):
 
         list_of_profiles = {}
+        for profile_id in self.get_profile_ids():
+            profile = self.get_profile(profile_id)
+            acls = self.get_profile_acls(profile_id)
+
+            if self.check_acls(username, acls):
+                list_of_profiles.update(profile)
+
         return list_of_profiles
 
     def select_profile(self, profile_name):
diff --git a/x2gobroker/tests/test_backend_base.py b/x2gobroker/tests/test_backend_base.py
index 8f26d38..b629156 100644
--- a/x2gobroker/tests/test_backend_base.py
+++ b/x2gobroker/tests/test_backend_base.py
@@ -19,6 +19,7 @@
 
 import unittest
 import tempfile
+import copy
 
 # Python X2GoBroker modules
 import x2gobroker.backends.base
@@ -26,10 +27,19 @@ import x2gobroker.defaults
 
 class TestX2GoBrokerBackendBase(unittest.TestCase):
 
+    def _init_base_backend(self):
+        _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS)
+        _config_defaults.update({'base': {'enable': True, }, })
+        _config = "" # use the config that derives directly from the config defaults
+        tf = tempfile.NamedTemporaryFile()
+        print >> tf, _config
+        tf.seek(0)
+        return x2gobroker.backends.base.X2GoBroker(config_file=tf.name, config_defaults=_config_defaults)
+
     ### TEST CONFIGURATION: <backend> >> enable = true|false
 
     def test_is_enabled(self):
-        _config_defaults = x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS
+        _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS)
         _config_defaults.update({'base': {'enable': True, }, })
         _config = """
 [base]
@@ -54,7 +64,7 @@ enable = true
     ### TEST CONFIGURATION: global >> check-credentials = false
 
     def test_check_access_nocreds(self):
-        _config_defaults = x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS
+        _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS)
         _config_defaults.update({'base': {'enable': True, }, })
         _config = """
 [global]
@@ -67,6 +77,133 @@ check-credentials = false
         self.assertEqual(base_backend.check_access(), True)
         tf.close()
 
+    ### TEST PROFILE DEFAULTS: get_profile_defaults()
+
+    def test_getdefaultprofile(self):
+        base_backend = self._init_base_backend()
+        _expected_profile = {
+            'defsndport': True,
+            'useiconv': False,
+            'iconvfrom': 'UTF-8',
+            'height': 600,
+            'export': '',
+            'quality': 9,
+            'fullscreen': False,
+            'layout': '',
+            'useexports': True,
+            'width': 800,
+            'speed': 2,
+            'soundsystem': 'pulse',
+            'print': True,
+            'type': 'auto',
+            'sndport': 4713,
+            'xinerama': True,
+            'variant': '',
+            'usekbd': True,
+            'fstunnel': True,
+            'applications': ['TERMINAL','WWWBROWSER','MAILCLIENT','OFFICE'],
+            'multidisp': False,
+            'sshproxyport': 22,
+            'sound': True,
+            'rootless': False,
+            'iconvto': 'UTF-8',
+            'soundtunnel': True,
+            'dpi': 96,
+            'sshport': 22,
+            'setdpi': 0,
+            'pack': '16m-jpeg',
+        }
+        _profile = base_backend.get_profile_defaults()
+        self.assertEqual(len(_expected_profile.keys()), len(_profile.keys()))
+        for key in _expected_profile.keys():
+            self.assertTrue( ( key in _profile.keys() ) )
+        for key in _profile.keys():
+            self.assertTrue( ( key in _expected_profile.keys() and _profile[key] == _expected_profile[key] ) )
+
+    ### TEST ACL DEFAULTS: get_acl_defaults()
+
+    def test_getdefaultacls(self):
+        base_backend = self._init_base_backend()
+        _expected_acls = {
+            'acl-users-allow': ['ALL'],
+            'acl-users-deny': [],
+            'acl-users-order': '',
+            'acl-groups-allow': ['ALL'],
+            'acl-groups-deny': [],
+            'acl-groups-order': '',
+            'acl-clients-allow': ['ALL'],
+            'acl-clients-deny': [],
+            'acl-clients-order': '',
+            'acl-any-order': 'deny-allow',
+        }
+        _acls = base_backend.get_acl_defaults()
+        self.assertEqual(len(_expected_acls.keys()), len(_acls.keys()))
+        for key in _expected_acls.keys():
+            self.assertTrue( ( key in _acls.keys() ) )
+        for key in _acls.keys():
+            self.assertTrue( ( key in _expected_acls.keys() and _acls[key] == _expected_acls[key] ) )
+
+    ### TEST ACL CHECK: check_profile_acls()
+
+    def test_checkprofileacls_simpletests(self):
+        base_backend = self._init_base_backend()
+        username = 'foo'
+        acls = {
+            'acl-users-allow': ['ALL'],
+            'acl-users-order': 'deny-allow',
+        }
+        self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+        acls = {
+            'acl-users-allow': ['foo'],
+            'acl-users-order': 'deny-allow',
+        }
+        self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+        acls = {
+            'acl-users-allow': ['ALL'],
+            'acl-users-order': 'allow-deny',
+        }
+        self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+        acls = {
+            'acl-users-allow': [],
+            'acl-users-deny': ['ALL'],
+            'acl-users-order': 'deny-allow',
+        }
+        self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+        acls = {
+            'acl-users-allow': [],
+            'acl-users-deny': ['ALL'],
+            'acl-users-order': 'allow-deny',
+        }
+        self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+
+    def test_checkprofileacls_usercombitests(self):
+        base_backend = self._init_base_backend()
+        username = 'foo'
+        acls = {
+            'acl-users-allow': ['ALL'],
+            'acl-users-deny': ['foo'],
+            'acl-users-order': 'allow-deny',
+        }
+        self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+        acls = {
+            'acl-users-allow': ['foo'],
+            'acl-users-deny': ['ALL'],
+            'acl-users-order': 'allow-deny',
+        }
+        self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+        acls = {
+            'acl-users-deny': ['ALL'],
+            'acl-users-allow': ['foo'],
+            'acl-users-order': 'deny-allow',
+        }
+        self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+        acls = {
+            'acl-users-deny': ['foo'],
+            'acl-users-allow': ['ALL'],
+            'acl-users-order': 'deny-allow',
+        }
+        self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+
 def test_suite():
     from unittest import TestSuite, makeSuite
     suite = TestSuite()
diff --git a/x2gobroker/tests/test_backend_inifile.py b/x2gobroker/tests/test_backend_inifile.py
index bec9381..9a92730 100644
--- a/x2gobroker/tests/test_backend_inifile.py
+++ b/x2gobroker/tests/test_backend_inifile.py
@@ -44,7 +44,7 @@ class TestX2GoBrokerBackendInifile(unittest.TestCase):
     def test_getprofiledefaults(self):
         inifile_backend = x2gobroker.backends.inifile.X2GoBroker(profile_config_file='../../etc/x2gobroker-sessionprofiles.conf')
         _profile_defaults = inifile_backend.get_profile_defaults()
-        _expected_profile_defaults = x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT']
+        _expected_profile_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT'])
         for key in copy.deepcopy(_expected_profile_defaults):
             if key.startswith('acl-'):
                 del _expected_profile_defaults[key]
diff --git a/x2gobroker/tests/test_web_plain_base.py b/x2gobroker/tests/test_web_plain_base.py
index 29cd656..25530c4 100644
--- a/x2gobroker/tests/test_web_plain_base.py
+++ b/x2gobroker/tests/test_web_plain_base.py
@@ -22,9 +22,7 @@ import tempfile
 from paste.fixture import TestApp
 from paste.request import EnvironHeaders
 from nose.tools import *
-
-# remove later, for debugging
-import subprocess
+import copy
 
 # Python X2GoBroker modules
 import x2gobroker.backends.base
@@ -115,6 +113,7 @@ auth-mech = testsuite
 
 
 def test_suite():
+    x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS.update({'base': {'enable': True, 'auth-mech': 'pam', }, })
     from unittest import TestSuite, makeSuite
     suite = TestSuite()
     suite.addTest(makeSuite(TestX2GoBrokerWebPlainBase))


hooks/post-receive
-- 
x2gobroker.git (HTTP(S) Session broker for X2Go)

This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "x2gobroker.git" (HTTP(S) Session broker for X2Go).




More information about the x2go-commits mailing list