[X2Go-Commits] x2gobroker.git - build-main (branch) updated: 248929ec32c01181742830159aed013a880eafca
X2Go dev team
git-admin at x2go.org
Sun May 19 13:03:01 CEST 2013
The branch, build-main has been updated
via 248929ec32c01181742830159aed013a880eafca (commit)
from 4bef261502ad0d4d20751eca0073f5f295aa99ff (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
-----------------------------------------------------------------------
Summary of changes:
x2gobroker/backends/base.py | 113 +++++++++++++++++++++++-
x2gobroker/backends/inifile.py | 7 ++
x2gobroker/tests/test_backend_base.py | 141 +++++++++++++++++++++++++++++-
x2gobroker/tests/test_backend_inifile.py | 2 +-
x2gobroker/tests/test_web_plain_base.py | 5 +-
5 files changed, 261 insertions(+), 7 deletions(-)
The diff of changes is:
diff --git a/x2gobroker/backends/base.py b/x2gobroker/backends/base.py
index 19cab3c..93bb288 100644
--- a/x2gobroker/backends/base.py
+++ b/x2gobroker/backends/base.py
@@ -26,6 +26,7 @@ __NAME__ = 'x2gobroker-pylib'
# modules
import types
+import copy
import uuid
import pam
@@ -125,11 +126,33 @@ class X2GoBroker(object):
Get the session profile defaults, i.e. profile options that all
configured session profiles have in common.
+ The defaults are hard-coded in L{x2gobroker.defaults} for class
+ L{x2gobroker.base.X2GoBroker}.
+
@return: a dictionary containing the session profile defaults
@rtype: C{dict}
"""
- return {}
+ profile_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT'])
+ for key in copy.deepcopy(profile_defaults):
+ if key.startswith('acl-'):
+ del profile_defaults[key]
+ return profile_defaults
+
+ def get_acl_defaults(self):
+ """\
+ Get the ACL defaults for session profiles. The defaults are hard-coded
+ in L{x2gobroker.defaults} for class L{x2gobroker.base.X2GoBroker}.
+
+ @return: a dictionary containing the ACL defaults for all session profiles
+ @rtype: C{dict}
+
+ """
+ acl_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT'])
+ for key in copy.deepcopy(acl_defaults):
+ if not key.startswith('acl-'):
+ del acl_defaults[key]
+ return acl_defaults
def get_profile(self, profile_id):
"""\
@@ -157,6 +180,94 @@ class X2GoBroker(object):
"""
return {}
+ def check_profile_acls(self, username, acls):
+ """\
+ Test if a given user can get through an ACL check using <acls> as a list
+ of allow and deny rules.
+
+ @param username: the username of interest
+ @type username: C{unicode}
+ @param acls: a dictionary data structure containing ACL information (see L{defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS})
+ @type acls: C{dict}
+
+ """
+ ### extract ACLs evaluation orders
+
+ _acls = self.get_acl_defaults()
+ _acls.update(acls)
+
+ _order = {}
+ _order['users'] = _order['groups'] = _order['clients'] = _acls['acl-any-order']
+
+ try: _order['users'] = _acls['acl-users-order']
+ except KeyError: pass
+ try: _order['groups'] = _acls['acl-groups-order']
+ except KeyError: pass
+ try: _order['clients'] = _acls['acl-clients-order']
+ except KeyError: pass
+
+ # to pass an ACL test, all three keys in the dict below have to be set to True
+ # if one stays False, the related session profile will not be returned to the querying
+ # X2Go client...
+ _grant_availability = {
+ 'by_user': False,
+ # FIXME: leaving the group access to False for now, we need methods that give us a generic
+ # acces to the list of groups a user belongs to. One generic access is asking libnss, but:
+ # are there others?
+ 'by_group': False,
+ # FIXME: set the client access to True for now as we have not a way to check that available...
+ 'by_client': True,
+ }
+
+ ### CHECKING on a per-user basis...
+
+ _allow_user = False
+ _explicit_allow_user = False
+ _deny_user = False
+ _explicit_deny_user = False
+
+ if (username in _acls['acl-users-allow']) or ('ALL' in _acls['acl-users-allow']):
+ _allow_user = True
+
+ if username in _acls['acl-users-deny']:
+ _explicit_deny_user = True
+ if _explicit_deny_user or ('ALL' in _acls['acl-users-deny']):
+ _deny_user = True
+
+ if _order['users'] == 'allow-deny':
+ _grant_availability['by_user'] = _allow_user and not _deny_user
+ else:
+ _grant_availability['by_user'] = not _deny_user or _allow_user
+
+ ### CHECKING on a per-group basis...
+
+ # FIXME: todo
+
+ ### CHECKING on a per-client basis...
+
+ # FIXME: todo
+
+ ### clients access is granted first, if that fails then we return False here...
+
+ if not _grant_availability['by_client']:
+ return False
+
+ ### now let's deal out the several combinations of user and group ACLs...
+ ###
+
+ # if a user has been granted access directly, then the corresponding session profile
+ # will be provided to him/her.
+ if _grant_availability['by_user']:
+ return True
+
+ # if a group has been granted access, with one exception: if the thread model for users is
+ # allow-deny, then we presume that the acl-users-deny entry has precendence over
+ # acl-groups-allow/acl-groups-deny.
+ if _grant_availability['by_group'] and not _explicit_deny_user:
+ return True
+
+ return False
+
def test_connection(self):
#if($cgi->param('task') eq 'testcon')
#{
diff --git a/x2gobroker/backends/inifile.py b/x2gobroker/backends/inifile.py
index 424634a..1c166c8 100644
--- a/x2gobroker/backends/inifile.py
+++ b/x2gobroker/backends/inifile.py
@@ -85,6 +85,13 @@ class X2GoBroker(base.X2GoBroker):
def list_profiles(self, username):
list_of_profiles = {}
+ for profile_id in self.get_profile_ids():
+ profile = self.get_profile(profile_id)
+ acls = self.get_profile_acls(profile_id)
+
+ if self.check_acls(username, acls):
+ list_of_profiles.update(profile)
+
return list_of_profiles
def select_profile(self, profile_name):
diff --git a/x2gobroker/tests/test_backend_base.py b/x2gobroker/tests/test_backend_base.py
index 8f26d38..b629156 100644
--- a/x2gobroker/tests/test_backend_base.py
+++ b/x2gobroker/tests/test_backend_base.py
@@ -19,6 +19,7 @@
import unittest
import tempfile
+import copy
# Python X2GoBroker modules
import x2gobroker.backends.base
@@ -26,10 +27,19 @@ import x2gobroker.defaults
class TestX2GoBrokerBackendBase(unittest.TestCase):
+ def _init_base_backend(self):
+ _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS)
+ _config_defaults.update({'base': {'enable': True, }, })
+ _config = "" # use the config that derives directly from the config defaults
+ tf = tempfile.NamedTemporaryFile()
+ print >> tf, _config
+ tf.seek(0)
+ return x2gobroker.backends.base.X2GoBroker(config_file=tf.name, config_defaults=_config_defaults)
+
### TEST CONFIGURATION: <backend> >> enable = true|false
def test_is_enabled(self):
- _config_defaults = x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS
+ _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS)
_config_defaults.update({'base': {'enable': True, }, })
_config = """
[base]
@@ -54,7 +64,7 @@ enable = true
### TEST CONFIGURATION: global >> check-credentials = false
def test_check_access_nocreds(self):
- _config_defaults = x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS
+ _config_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS)
_config_defaults.update({'base': {'enable': True, }, })
_config = """
[global]
@@ -67,6 +77,133 @@ check-credentials = false
self.assertEqual(base_backend.check_access(), True)
tf.close()
+ ### TEST PROFILE DEFAULTS: get_profile_defaults()
+
+ def test_getdefaultprofile(self):
+ base_backend = self._init_base_backend()
+ _expected_profile = {
+ 'defsndport': True,
+ 'useiconv': False,
+ 'iconvfrom': 'UTF-8',
+ 'height': 600,
+ 'export': '',
+ 'quality': 9,
+ 'fullscreen': False,
+ 'layout': '',
+ 'useexports': True,
+ 'width': 800,
+ 'speed': 2,
+ 'soundsystem': 'pulse',
+ 'print': True,
+ 'type': 'auto',
+ 'sndport': 4713,
+ 'xinerama': True,
+ 'variant': '',
+ 'usekbd': True,
+ 'fstunnel': True,
+ 'applications': ['TERMINAL','WWWBROWSER','MAILCLIENT','OFFICE'],
+ 'multidisp': False,
+ 'sshproxyport': 22,
+ 'sound': True,
+ 'rootless': False,
+ 'iconvto': 'UTF-8',
+ 'soundtunnel': True,
+ 'dpi': 96,
+ 'sshport': 22,
+ 'setdpi': 0,
+ 'pack': '16m-jpeg',
+ }
+ _profile = base_backend.get_profile_defaults()
+ self.assertEqual(len(_expected_profile.keys()), len(_profile.keys()))
+ for key in _expected_profile.keys():
+ self.assertTrue( ( key in _profile.keys() ) )
+ for key in _profile.keys():
+ self.assertTrue( ( key in _expected_profile.keys() and _profile[key] == _expected_profile[key] ) )
+
+ ### TEST ACL DEFAULTS: get_acl_defaults()
+
+ def test_getdefaultacls(self):
+ base_backend = self._init_base_backend()
+ _expected_acls = {
+ 'acl-users-allow': ['ALL'],
+ 'acl-users-deny': [],
+ 'acl-users-order': '',
+ 'acl-groups-allow': ['ALL'],
+ 'acl-groups-deny': [],
+ 'acl-groups-order': '',
+ 'acl-clients-allow': ['ALL'],
+ 'acl-clients-deny': [],
+ 'acl-clients-order': '',
+ 'acl-any-order': 'deny-allow',
+ }
+ _acls = base_backend.get_acl_defaults()
+ self.assertEqual(len(_expected_acls.keys()), len(_acls.keys()))
+ for key in _expected_acls.keys():
+ self.assertTrue( ( key in _acls.keys() ) )
+ for key in _acls.keys():
+ self.assertTrue( ( key in _expected_acls.keys() and _acls[key] == _expected_acls[key] ) )
+
+ ### TEST ACL CHECK: check_profile_acls()
+
+ def test_checkprofileacls_simpletests(self):
+ base_backend = self._init_base_backend()
+ username = 'foo'
+ acls = {
+ 'acl-users-allow': ['ALL'],
+ 'acl-users-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-users-allow': ['foo'],
+ 'acl-users-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-users-allow': ['ALL'],
+ 'acl-users-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-users-allow': [],
+ 'acl-users-deny': ['ALL'],
+ 'acl-users-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ acls = {
+ 'acl-users-allow': [],
+ 'acl-users-deny': ['ALL'],
+ 'acl-users-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+
+ def test_checkprofileacls_usercombitests(self):
+ base_backend = self._init_base_backend()
+ username = 'foo'
+ acls = {
+ 'acl-users-allow': ['ALL'],
+ 'acl-users-deny': ['foo'],
+ 'acl-users-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ acls = {
+ 'acl-users-allow': ['foo'],
+ 'acl-users-deny': ['ALL'],
+ 'acl-users-order': 'allow-deny',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), False)
+ acls = {
+ 'acl-users-deny': ['ALL'],
+ 'acl-users-allow': ['foo'],
+ 'acl-users-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+ acls = {
+ 'acl-users-deny': ['foo'],
+ 'acl-users-allow': ['ALL'],
+ 'acl-users-order': 'deny-allow',
+ }
+ self.assertEqual(base_backend.check_profile_acls(username, acls), True)
+
def test_suite():
from unittest import TestSuite, makeSuite
suite = TestSuite()
diff --git a/x2gobroker/tests/test_backend_inifile.py b/x2gobroker/tests/test_backend_inifile.py
index bec9381..9a92730 100644
--- a/x2gobroker/tests/test_backend_inifile.py
+++ b/x2gobroker/tests/test_backend_inifile.py
@@ -44,7 +44,7 @@ class TestX2GoBrokerBackendInifile(unittest.TestCase):
def test_getprofiledefaults(self):
inifile_backend = x2gobroker.backends.inifile.X2GoBroker(profile_config_file='../../etc/x2gobroker-sessionprofiles.conf')
_profile_defaults = inifile_backend.get_profile_defaults()
- _expected_profile_defaults = x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT']
+ _expected_profile_defaults = copy.deepcopy(x2gobroker.defaults.X2GOBROKER_SESSIONPROFILE_DEFAULTS['DEFAULT'])
for key in copy.deepcopy(_expected_profile_defaults):
if key.startswith('acl-'):
del _expected_profile_defaults[key]
diff --git a/x2gobroker/tests/test_web_plain_base.py b/x2gobroker/tests/test_web_plain_base.py
index 29cd656..25530c4 100644
--- a/x2gobroker/tests/test_web_plain_base.py
+++ b/x2gobroker/tests/test_web_plain_base.py
@@ -22,9 +22,7 @@ import tempfile
from paste.fixture import TestApp
from paste.request import EnvironHeaders
from nose.tools import *
-
-# remove later, for debugging
-import subprocess
+import copy
# Python X2GoBroker modules
import x2gobroker.backends.base
@@ -115,6 +113,7 @@ auth-mech = testsuite
def test_suite():
+ x2gobroker.defaults.X2GOBROKER_CONFIG_DEFAULTS.update({'base': {'enable': True, 'auth-mech': 'pam', }, })
from unittest import TestSuite, makeSuite
suite = TestSuite()
suite.addTest(makeSuite(TestX2GoBrokerWebPlainBase))
hooks/post-receive
--
x2gobroker.git (HTTP(S) Session broker for X2Go)
This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "x2gobroker.git" (HTTP(S) Session broker for X2Go).
More information about the x2go-commits
mailing list