From b76dc9bf4eccae2a442f3c365214aa375fcbd2cd Mon Sep 17 00:00:00 2001 From: Eklavya Sharma Date: Wed, 29 Jun 2016 20:43:28 +0530 Subject: [PATCH] zerver/lib/upload.py: Fix string types. --- zerver/lib/upload.py | 60 ++++++++++++++++++------------------- zerver/tests/test_upload.py | 6 ++-- 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/zerver/lib/upload.py b/zerver/lib/upload.py index b6010d0d4d..63ea15363f 100644 --- a/zerver/lib/upload.py +++ b/zerver/lib/upload.py @@ -4,7 +4,6 @@ from typing import Optional, Tuple, Mapping, Any from django.utils.translation import ugettext as _ from django.conf import settings from django.template.defaultfilters import slugify -from django.utils.encoding import force_text from django.core.files import File from django.http import HttpRequest from jinja2 import Markup as mark_safe @@ -12,6 +11,7 @@ import unicodedata from zerver.lib.avatar import user_avatar_hash from zerver.lib.request import JsonableError +from zerver.lib.str_utils import force_text, force_str, NonBinaryStr from boto.s3.bucket import Bucket from boto.s3.key import Key @@ -26,10 +26,9 @@ from six.moves import urllib import base64 import os import re -import six from PIL import Image, ImageOps from six import binary_type, text_type -from six.moves import cStringIO as StringIO +import io import random import logging @@ -48,8 +47,8 @@ import logging # "file name" is the original filename provided by the user run # through a sanitization function. -def sanitize_name(value): - # type: (six.text_type) -> str +def sanitize_name(raw_value): + # type: (NonBinaryStr) -> text_type """ Sanitizes a value to be safe to store in a Linux filesystem, in S3, and in a URL. So unicode is allowed, but not special @@ -61,27 +60,27 @@ def sanitize_name(value): * adding '.' and '_' to the list of allowed characters. * preserving the case of the value. """ - value = force_text(value) + value = force_text(raw_value) value = unicodedata.normalize('NFKC', value) value = re.sub('[^\w\s._-]', '', value, flags=re.U).strip() return mark_safe(re.sub('[-\s]+', '-', value, flags=re.U)) def random_name(bytes=60): - # type: (int) -> str - return base64.urlsafe_b64encode(os.urandom(bytes)) + # type: (int) -> text_type + return base64.urlsafe_b64encode(os.urandom(bytes)).decode('utf-8') class BadImageError(JsonableError): pass def resize_avatar(image_data): - # type: (str) -> str + # type: (binary_type) -> binary_type AVATAR_SIZE = 100 try: - im = Image.open(StringIO(image_data)) + im = Image.open(io.BytesIO(image_data)) im = ImageOps.fit(im, (AVATAR_SIZE, AVATAR_SIZE), Image.ANTIALIAS) except IOError: raise BadImageError("Could not decode avatar image; did you upload an image file?") - out = StringIO() + out = io.BytesIO() im.save(out, format='png') return out.getvalue() @@ -89,7 +88,7 @@ def resize_avatar(image_data): class ZulipUploadBackend(object): def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None): - # type: (str, str, str, UserProfile, Optional[Realm]) -> str + # type: (text_type, text_type, binary_type, UserProfile, Optional[Realm]) -> text_type raise NotImplementedError() def upload_avatar_image(self, user_file, user_profile, email): @@ -122,45 +121,43 @@ def upload_image_to_s3( user_profile, contents, ): - # type: (text_type, text_type, text_type, UserProfile, text_type) -> None + # type: (NonBinaryStr, text_type, text_type, UserProfile, binary_type) -> None conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) - bucket = get_bucket(conn, bucket_name) + bucket = get_bucket(conn, force_str(bucket_name)) key = Key(bucket) - key.key = file_name + key.key = force_str(file_name) key.set_metadata("user_profile_id", str(user_profile.id)) key.set_metadata("realm_id", str(user_profile.realm.id)) if content_type: - headers = {'Content-Type': content_type} + headers = {'Content-Type': force_str(content_type)} else: headers = None key.set_contents_from_string(contents, headers=headers) def get_file_info(request, user_file): - # type: (HttpRequest, File) -> Tuple[str, str] + # type: (HttpRequest, File) -> Tuple[text_type, text_type] - # `user_file.name` is a unicode whereas it should be an ascii - # so convert it into an ascii. - uploaded_file_name = user_file.name.encode('ascii') + uploaded_file_name = user_file.name content_type = request.GET.get('mimetype') if content_type is None: - content_type = guess_type(uploaded_file_name)[0] + content_type = force_text(guess_type(uploaded_file_name)[0]) else: uploaded_file_name = uploaded_file_name + guess_extension(content_type) - uploaded_file_name = urllib.parse.unquote(uploaded_file_name).decode('utf-8') + uploaded_file_name = urllib.parse.unquote(uploaded_file_name) return uploaded_file_name, content_type def get_signed_upload_url(path): - # type: (str) -> str + # type: (text_type) -> text_type conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) - return conn.generate_url(15, 'GET', bucket=settings.S3_AUTH_UPLOADS_BUCKET, key=path) + return force_text(conn.generate_url(15, 'GET', bucket=settings.S3_AUTH_UPLOADS_BUCKET, key=force_str(path))) def get_realm_for_filename(path): - # type: (str) -> int + # type: (text_type) -> Optional[int] conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) key = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path) if key is None: @@ -170,7 +167,7 @@ def get_realm_for_filename(path): class S3UploadBackend(ZulipUploadBackend): def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None): - # type: (str, str, str, UserProfile, Optional[Realm]) -> str + # type: (text_type, text_type, binary_type, UserProfile, Optional[Realm]) -> text_type bucket_name = settings.S3_AUTH_UPLOADS_BUCKET s3_file_name = "/".join([ str(target_realm.id if target_realm is not None else user_profile.realm.id), @@ -234,7 +231,7 @@ class S3UploadBackend(ZulipUploadBackend): ### Local def mkdirs(path): - # type: (str) -> None + # type: (text_type) -> None dirname = os.path.dirname(path) if not os.path.isdir(dirname): os.makedirs(dirname) @@ -247,6 +244,7 @@ def write_local_file(type, path, file_data): f.write(file_data) def get_local_file_path(path_id): + # type: (text_type) -> Optional[text_type] local_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id) if os.path.isfile(local_path): return local_path @@ -255,7 +253,7 @@ def get_local_file_path(path_id): class LocalUploadBackend(ZulipUploadBackend): def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None): - # type: (str, str, str, UserProfile, Optional[Realm]) -> str + # type: (text_type, text_type, binary_type, UserProfile, Optional[Realm]) -> text_type # Split into 256 subdirectories to prevent directories from getting too big path = "/".join([ str(user_profile.realm.id), @@ -305,7 +303,7 @@ def upload_avatar_image(user_file, user_profile, email): upload_backend.upload_avatar_image(user_file, user_profile, email) def upload_message_image(uploaded_file_name, content_type, file_data, user_profile, target_realm=None): - # type: (str, str, str, UserProfile, Optional[Realm]) -> str + # type: (text_type, text_type, binary_type, UserProfile, Optional[Realm]) -> text_type return upload_backend.upload_message_image(uploaded_file_name, content_type, file_data, user_profile, target_realm=target_realm) @@ -326,11 +324,11 @@ def claim_attachment(user_profile, path_id, message, is_message_realm_public): return False def create_attachment(file_name, path_id, user_profile): - # type: (str, str, UserProfile) -> bool + # type: (text_type, text_type, UserProfile) -> bool Attachment.objects.create(file_name=file_name, path_id=path_id, owner=user_profile, realm=user_profile.realm) return True def upload_message_image_from_request(request, user_file, user_profile): - # type: (HttpRequest, File, UserProfile) -> str + # type: (HttpRequest, File, UserProfile) -> text_type uploaded_file_name, content_type = get_file_info(request, user_file) return upload_message_image(uploaded_file_name, content_type, user_file.read(), user_profile) diff --git a/zerver/tests/test_upload.py b/zerver/tests/test_upload.py index 52e6ebc0a2..6a2646cd88 100644 --- a/zerver/tests/test_upload.py +++ b/zerver/tests/test_upload.py @@ -271,7 +271,7 @@ class LocalStorageTest(AuthedTestCase): # type: () -> None sender_email = "hamlet@zulip.com" user_profile = get_user_profile_by_email(sender_email) - uri = upload_message_image('dummy.txt', 'text/plain', 'zulip!', user_profile) + uri = upload_message_image(u'dummy.txt', u'text/plain', b'zulip!', user_profile) base = '/user_uploads/' self.assertEquals(base, uri[:len(base)]) @@ -316,7 +316,7 @@ class S3Test(AuthedTestCase): sender_email = "hamlet@zulip.com" user_profile = get_user_profile_by_email(sender_email) - uri = upload_message_image('dummy.txt', 'text/plain', 'zulip!', user_profile) + uri = upload_message_image(u'dummy.txt', u'text/plain', b'zulip!', user_profile) base = '/user_uploads/' self.assertEquals(base, uri[:len(base)]) @@ -336,7 +336,7 @@ class S3Test(AuthedTestCase): sender_email = "hamlet@zulip.com" user_profile = get_user_profile_by_email(sender_email) - uri = upload_message_image('dummy.txt', 'text/plain', 'zulip!', user_profile) + uri = upload_message_image(u'dummy.txt', u'text/plain', b'zulip!', user_profile) path_id = re.sub('/user_uploads/', '', uri) self.assertTrue(delete_message_image(path_id))