diff --git a/zerver/tests/test_auth_backends.py b/zerver/tests/test_auth_backends.py index e87a5f9b0e..a3354ffbd7 100644 --- a/zerver/tests/test_auth_backends.py +++ b/zerver/tests/test_auth_backends.py @@ -2396,6 +2396,92 @@ class TestZulipRemoteUserBackend(ZulipTestCase): self.assertEqual(len(mail.outbox), 1) self.assertIn('Zulip on Android', mail.outbox[0].body) + @override_settings(SEND_LOGIN_EMAILS=True) + @override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipRemoteUserBackend', + 'zproject.backends.ZulipDummyBackend')) + def test_login_desktop_flow_otp_success_email(self) -> None: + user_profile = self.example_user('hamlet') + email = user_profile.email + user_profile.date_joined = timezone_now() - datetime.timedelta(seconds=61) + user_profile.save() + desktop_flow_otp = '1234abcd' * 8 + + # Verify that the right thing happens with an invalid-format OTP + result = self.client_post('/accounts/login/sso/', + dict(desktop_flow_otp="1234"), + REMOTE_USER=email) + self.assert_logged_in_user_id(None) + self.assert_json_error_contains(result, "Invalid OTP", 400) + + result = self.client_post('/accounts/login/sso/', + dict(desktop_flow_otp="invalido" * 8), + REMOTE_USER=email) + self.assert_logged_in_user_id(None) + self.assert_json_error_contains(result, "Invalid OTP", 400) + + result = self.client_post('/accounts/login/sso/', + dict(desktop_flow_otp=desktop_flow_otp), + REMOTE_USER=email) + self.assertEqual(result.status_code, 302) + redirect_url = result['Location'] + parsed_url = urllib.parse.urlparse(redirect_url) + query_params = urllib.parse.parse_qs(parsed_url.query) + self.assertEqual(parsed_url.scheme, 'zulip') + self.assertEqual(query_params["realm"], ['http://zulip.testserver']) + self.assertEqual(query_params["email"], [self.example_email("hamlet")]) + + encrypted_key = query_params["otp_encrypted_login_key"][0] + decrypted_key = otp_decrypt_api_key(encrypted_key, desktop_flow_otp) + auth_url = 'http://zulip.testserver/accounts/login/subdomain/{}'.format(decrypted_key) + + result = self.client_get(auth_url) + self.assertEqual(result.status_code, 302) + self.assert_logged_in_user_id(user_profile.id) + + @override_settings(SEND_LOGIN_EMAILS=True) + @override_settings(SSO_APPEND_DOMAIN="zulip.com") + @override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipRemoteUserBackend', + 'zproject.backends.ZulipDummyBackend')) + def test_login_desktop_flow_otp_success_username(self) -> None: + user_profile = self.example_user('hamlet') + email = user_profile.email + remote_user = email_to_username(email) + user_profile.date_joined = timezone_now() - datetime.timedelta(seconds=61) + user_profile.save() + desktop_flow_otp = '1234abcd' * 8 + + # Verify that the right thing happens with an invalid-format OTP + result = self.client_post('/accounts/login/sso/', + dict(desktop_flow_otp="1234"), + REMOTE_USER=remote_user) + self.assert_logged_in_user_id(None) + self.assert_json_error_contains(result, "Invalid OTP", 400) + + result = self.client_post('/accounts/login/sso/', + dict(desktop_flow_otp="invalido" * 8), + REMOTE_USER=remote_user) + self.assert_logged_in_user_id(None) + self.assert_json_error_contains(result, "Invalid OTP", 400) + + result = self.client_post('/accounts/login/sso/', + dict(desktop_flow_otp=desktop_flow_otp), + REMOTE_USER=remote_user) + self.assertEqual(result.status_code, 302) + redirect_url = result['Location'] + parsed_url = urllib.parse.urlparse(redirect_url) + query_params = urllib.parse.parse_qs(parsed_url.query) + self.assertEqual(parsed_url.scheme, 'zulip') + self.assertEqual(query_params["realm"], ['http://zulip.testserver']) + self.assertEqual(query_params["email"], [self.example_email("hamlet")]) + + encrypted_key = query_params["otp_encrypted_login_key"][0] + decrypted_key = otp_decrypt_api_key(encrypted_key, desktop_flow_otp) + auth_url = 'http://zulip.testserver/accounts/login/subdomain/{}'.format(decrypted_key) + + result = self.client_get(auth_url) + self.assertEqual(result.status_code, 302) + self.assert_logged_in_user_id(user_profile.id) + def test_redirect_to(self) -> None: """This test verifies the behavior of the redirect_to logic in login_or_register_remote_user.""" diff --git a/zerver/views/auth.py b/zerver/views/auth.py index d0b164b4b8..e6419898a3 100644 --- a/zerver/views/auth.py +++ b/zerver/views/auth.py @@ -295,7 +295,8 @@ def create_response_for_otp_flow(key: str, otp: str, user_profile: UserProfile, @log_view_func @has_request_variables def remote_user_sso(request: HttpRequest, - mobile_flow_otp: Optional[str]=REQ(default=None)) -> HttpResponse: + mobile_flow_otp: Optional[str]=REQ(default=None), + desktop_flow_otp: Optional[str]=REQ(default=None)) -> HttpResponse: subdomain = get_subdomain(request) try: realm = get_realm(subdomain) # type: Optional[Realm] @@ -316,12 +317,15 @@ def remote_user_sso(request: HttpRequest, # enabled. validate_login_email(remote_user_to_email(remote_user)) - # Here we support the mobile flow for REMOTE_USER_BACKEND; we + # Here we support the mobile and desktop flow for REMOTE_USER_BACKEND; we # validate the data format and then pass it through to # login_or_register_remote_user if appropriate. if mobile_flow_otp is not None: if not is_valid_otp(mobile_flow_otp): raise JsonableError(_("Invalid OTP")) + if desktop_flow_otp is not None: + if not is_valid_otp(desktop_flow_otp): + raise JsonableError(_("Invalid OTP")) subdomain = get_subdomain(request) if realm is None: @@ -333,6 +337,8 @@ def remote_user_sso(request: HttpRequest, return login_or_register_remote_user(request, remote_user, user_profile, mobile_flow_otp=mobile_flow_otp, + desktop_flow_otp=desktop_flow_otp, + realm=realm, redirect_to=redirect_to) @csrf_exempt