settings: Correctly handle passing empty anonymous group.

If empty anonymous group is passed for a setting value in
an API request, the setting is set to "Nobody" group.
This commit is contained in:
Sahil Batra 2025-02-17 20:16:18 +05:30 committed by Tim Abbott
parent dd99ed019a
commit cf3315bd18
7 changed files with 138 additions and 14 deletions

View File

@ -127,6 +127,13 @@ def get_user_group_by_id_in_realm(
raise JsonableError(_("Invalid user group"))
def get_system_user_group_by_name(group_name: str, realm_id: int) -> NamedUserGroup:
if group_name not in SystemGroups.GROUP_DISPLAY_NAME_MAP:
raise JsonableError(_("Invalid system group name."))
return NamedUserGroup.objects.get(name=group_name, realm_id=realm_id, is_system_group=True)
def access_user_group_to_read_membership(user_group_id: int, realm: Realm) -> NamedUserGroup:
return get_user_group_by_id_in_realm(user_group_id, realm, for_read=True)
@ -1031,6 +1038,7 @@ def get_server_supported_permission_settings() -> ServerSupportedPermissionSetti
def parse_group_setting_value(
setting_value: int | AnonymousSettingGroupDict,
nobody_group: NamedUserGroup,
) -> int | AnonymousSettingGroupDict:
if isinstance(setting_value, int):
return setting_value
@ -1038,6 +1046,9 @@ def parse_group_setting_value(
if len(setting_value.direct_members) == 0 and len(setting_value.direct_subgroups) == 1:
return setting_value.direct_subgroups[0]
if len(setting_value.direct_members) == 0 and len(setting_value.direct_subgroups) == 0:
return nobody_group.id
return setting_value

View File

@ -2252,6 +2252,33 @@ class RealmAPITest(ZulipTestCase):
realm = get_realm("zulip")
self.assertEqual(getattr(realm, setting_name), admins_group.usergroup_ptr)
permission_configuration = Realm.REALM_PERMISSION_GROUP_SETTINGS[setting_name]
nobody_group = NamedUserGroup.objects.get(
name=SystemGroups.NOBODY, realm=realm, is_system_group=True
)
result = self.client_patch(
"/json/realm",
{
setting_name: orjson.dumps(
{
"new": {
"direct_members": [],
"direct_subgroups": [],
},
"old": admins_group.id,
}
).decode()
},
)
if not permission_configuration.allow_nobody_group:
self.assert_json_error(
result, f"'{setting_name}' setting cannot be set to 'role:nobody' group."
)
else:
self.assert_json_success(result)
realm = get_realm("zulip")
self.assertEqual(getattr(realm, setting_name), nobody_group.usergroup_ptr)
def test_update_realm_properties(self) -> None:
for prop in Realm.property_types:
# push_notifications_enabled is maintained by the server, not via the API.

View File

@ -708,6 +708,28 @@ class TestCreateStreams(ZulipTestCase):
# testing another setting value.
stream.delete()
nobody_group = NamedUserGroup.objects.get(
name="role:nobody", is_system_group=True, realm=realm
)
subscriptions = [{"name": "new_stream", "description": "New stream"}]
extra_post_data[setting_name] = orjson.dumps(
{"direct_members": [], "direct_subgroups": []}
).decode()
result = self.subscribe_via_post(
user,
subscriptions,
extra_post_data,
allow_fail=True,
subdomain="zulip",
)
self.assert_json_success(result)
stream = get_stream("new_stream", realm)
self.assertEqual(getattr(stream, setting_name).id, nobody_group.id)
# Delete the created stream, so we can create a new one for
# testing another setting value.
stream.delete()
subscriptions = [{"name": "new_stream", "description": "New stream"}]
owners_group = NamedUserGroup.objects.get(
name="role:owners", is_system_group=True, realm=realm
@ -728,9 +750,6 @@ class TestCreateStreams(ZulipTestCase):
stream.delete()
subscriptions = [{"name": "new_stream", "description": "New stream"}]
nobody_group = NamedUserGroup.objects.get(
name="role:nobody", is_system_group=True, realm=realm
)
extra_post_data[setting_name] = orjson.dumps(nobody_group.id).decode()
result = self.subscribe_via_post(
user,

View File

@ -31,6 +31,7 @@ from zerver.actions.user_groups import (
)
from zerver.actions.users import do_deactivate_user
from zerver.lib.create_user import create_user
from zerver.lib.exceptions import JsonableError
from zerver.lib.mention import silent_mention_syntax_for_user
from zerver.lib.streams import ensure_stream
from zerver.lib.test_classes import ZulipTestCase
@ -47,6 +48,7 @@ from zerver.lib.user_groups import (
get_recursive_subgroups_union_for_groups,
get_role_based_system_groups_dict,
get_subgroup_ids,
get_system_user_group_by_name,
get_user_group_member_ids,
has_user_group_access_for_subgroup,
is_any_user_in_group,
@ -475,6 +477,19 @@ class UserGroupTestCase(ZulipTestCase):
self.assertTrue(has_user_group_access_for_subgroup(zulip_group, iago))
self.assertTrue(has_user_group_access_for_subgroup(moderators_group, iago))
def test_get_system_user_group_by_name(self) -> None:
realm = get_realm("zulip")
moderators_group = NamedUserGroup.objects.get(
name=SystemGroups.MODERATORS, realm=realm, is_system_group=True
)
self.assertEqual(
get_system_user_group_by_name(SystemGroups.MODERATORS, realm.id), moderators_group
)
with self.assertRaisesRegex(JsonableError, "Invalid system group name."):
get_system_user_group_by_name("hamletcharacters", realm.id)
class UserGroupAPITestCase(UserGroupTestCase):
def test_user_group_create(self) -> None:
@ -709,6 +724,23 @@ class UserGroupAPITestCase(UserGroupTestCase):
# We do not create a new UserGroup object in such case.
self.assertEqual(getattr(help_group, setting_name).id, moderators_group.id)
params = {
"name": "devops",
"members": orjson.dumps([hamlet.id]).decode(),
"description": "Devops team",
}
params[setting_name] = orjson.dumps(
{
"direct_members": [],
"direct_subgroups": [],
}
).decode()
result = self.client_post("/json/user_groups/create", info=params)
self.assert_json_success(result)
devops_group = NamedUserGroup.objects.get(name="devops", realm=hamlet.realm)
# We do not create a new UserGroup object in such case.
self.assertEqual(getattr(devops_group, setting_name).id, nobody_group.id)
internet_group = NamedUserGroup.objects.get(
name="role:internet", realm=hamlet.realm, is_system_group=True
)
@ -995,14 +1027,27 @@ class UserGroupAPITestCase(UserGroupTestCase):
[marketing_group, moderators_group],
)
params[setting_name] = orjson.dumps({"new": marketing_group.id}).decode()
previous_setting_id = getattr(support_group, setting_name).id
params[setting_name] = orjson.dumps(
{
"new": {
"direct_members": [],
"direct_subgroups": [],
}
}
).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
# Test that the previous UserGroup object is deleted.
self.assertFalse(UserGroup.objects.filter(id=previous_setting_id).exists())
self.assertEqual(getattr(support_group, setting_name).id, nobody_group.id)
params[setting_name] = orjson.dumps({"new": marketing_group.id}).decode()
result = self.client_patch(f"/json/user_groups/{support_group.id}", info=params)
self.assert_json_success(result)
support_group = NamedUserGroup.objects.get(name="support", realm=hamlet.realm)
# Test that the previous UserGroup object is deleted.
self.assertFalse(UserGroup.objects.filter(id=previous_setting_id).exists())
self.assertEqual(getattr(support_group, setting_name).id, marketing_group.id)
owners_group = NamedUserGroup.objects.get(

View File

@ -41,11 +41,13 @@ from zerver.lib.user_groups import (
GroupSettingChangeRequest,
access_user_group_for_setting,
get_group_setting_value_for_api,
get_system_user_group_by_name,
parse_group_setting_value,
validate_group_setting_value_change,
)
from zerver.lib.validator import check_capped_url, check_string
from zerver.models import Realm, RealmReactivationStatus, RealmUserDefault, UserProfile
from zerver.models.groups import SystemGroups
from zerver.models.realms import DigestWeekdayEnum, OrgTypeEnum
from zerver.views.user_settings import (
check_information_density_setting_values,
@ -352,6 +354,7 @@ def update_realm(
else:
data[k] = v
nobody_group = get_system_user_group_by_name(SystemGroups.NOBODY, user_profile.realm_id)
for setting_name, permission_configuration in Realm.REALM_PERMISSION_GROUP_SETTINGS.items():
expected_current_setting_value = None
assert setting_name in req_group_setting_vars
@ -359,10 +362,12 @@ def update_realm(
continue
setting_value = req_group_setting_vars[setting_name]
new_setting_value = parse_group_setting_value(setting_value.new)
new_setting_value = parse_group_setting_value(setting_value.new, nobody_group)
if setting_value.old is not None:
expected_current_setting_value = parse_group_setting_value(setting_value.old)
expected_current_setting_value = parse_group_setting_value(
setting_value.old, nobody_group
)
current_value = getattr(realm, setting_name)
current_setting_api_value = get_group_setting_value_for_api(current_value)

View File

@ -93,6 +93,7 @@ from zerver.lib.user_groups import (
access_user_group_for_setting,
get_group_setting_value_for_api,
get_role_based_system_groups_dict,
get_system_user_group_by_name,
parse_group_setting_value,
validate_group_setting_value_change,
)
@ -100,6 +101,7 @@ from zerver.lib.user_topics import get_users_with_user_topic_visibility_policy
from zerver.lib.users import access_bot_by_id, bulk_access_users_by_email, bulk_access_users_by_id
from zerver.lib.utils import assert_is_not_none
from zerver.models import Realm, Stream, UserMessage, UserProfile, UserTopic
from zerver.models.groups import SystemGroups
from zerver.models.users import get_system_bot
@ -393,6 +395,7 @@ def update_stream_backend(
check_stream_name_available(user_profile.realm, new_name)
do_rename_stream(stream, new_name, user_profile)
nobody_group = get_system_user_group_by_name(SystemGroups.NOBODY, user_profile.realm_id)
request_settings_dict = locals()
for setting_name, permission_configuration in Stream.stream_permission_group_settings.items():
assert setting_name in request_settings_dict
@ -400,11 +403,13 @@ def update_stream_backend(
continue
setting_value = request_settings_dict[setting_name]
new_setting_value = parse_group_setting_value(setting_value.new)
new_setting_value = parse_group_setting_value(setting_value.new, nobody_group)
expected_current_setting_value = None
if setting_value.old is not None:
expected_current_setting_value = parse_group_setting_value(setting_value.old)
expected_current_setting_value = parse_group_setting_value(
setting_value.old, nobody_group
)
current_value = getattr(stream, setting_name)
current_setting_api_value = get_group_setting_value_for_api(current_value)
@ -618,7 +623,11 @@ def add_subscriptions_backend(
assert setting_name in request_settings_dict
if request_settings_dict[setting_name] is not None:
setting_request_value = request_settings_dict[setting_name]
setting_value = parse_group_setting_value(setting_request_value)
if system_groups_name_dict is None:
system_groups_name_dict = get_role_based_system_groups_dict(realm)
setting_value = parse_group_setting_value(
setting_request_value, system_groups_name_dict[SystemGroups.NOBODY]
)
group_settings_map[setting_name] = access_user_group_for_setting(
setting_value,
user_profile,

View File

@ -33,6 +33,7 @@ from zerver.lib.user_groups import (
get_direct_memberships_of_users,
get_group_setting_value_for_api,
get_subgroup_ids,
get_system_user_group_by_name,
get_user_group_direct_member_ids,
get_user_group_member_ids,
is_user_in_group,
@ -43,6 +44,7 @@ from zerver.lib.user_groups import (
)
from zerver.lib.users import access_user_by_id, user_ids_to_users
from zerver.models import NamedUserGroup, UserProfile
from zerver.models.groups import SystemGroups
from zerver.models.users import get_system_bot
from zerver.views.streams import compose_views
@ -71,12 +73,15 @@ def add_user_group(
group_settings_map = {}
request_settings_dict = locals()
nobody_group = get_system_user_group_by_name(SystemGroups.NOBODY, user_profile.realm_id)
for setting_name, permission_config in NamedUserGroup.GROUP_PERMISSION_SETTINGS.items():
if setting_name not in request_settings_dict: # nocoverage
continue
if request_settings_dict[setting_name] is not None:
setting_value = parse_group_setting_value(request_settings_dict[setting_name])
setting_value = parse_group_setting_value(
request_settings_dict[setting_name], nobody_group
)
setting_value_group = access_user_group_for_setting(
setting_value,
user_profile,
@ -159,6 +164,7 @@ def edit_user_group(
do_update_user_group_description(user_group, description, acting_user=user_profile)
request_settings_dict = locals()
nobody_group = get_system_user_group_by_name(SystemGroups.NOBODY, user_profile.realm_id)
for setting_name, permission_config in NamedUserGroup.GROUP_PERMISSION_SETTINGS.items():
if setting_name not in request_settings_dict: # nocoverage
continue
@ -167,11 +173,13 @@ def edit_user_group(
continue
setting_value = request_settings_dict[setting_name]
new_setting_value = parse_group_setting_value(setting_value.new)
new_setting_value = parse_group_setting_value(setting_value.new, nobody_group)
expected_current_setting_value = None
if setting_value.old is not None:
expected_current_setting_value = parse_group_setting_value(setting_value.old)
expected_current_setting_value = parse_group_setting_value(
setting_value.old, nobody_group
)
current_value = getattr(user_group, setting_name)
current_setting_api_value = get_group_setting_value_for_api(current_value)