From a331b4f64d5d5dc3be6f0001f12eb34fe184b380 Mon Sep 17 00:00:00 2001 From: Steve Howell Date: Fri, 6 Oct 2017 08:35:55 -0700 Subject: [PATCH] Optimize query_all_subs_by_stream(). Using lightweight objects will speed up adding new users to realms. We also sort the query results, which lets us itertools.groupby to more efficiently build the data structure. Profiling on a large data set shows about a 25x speedup for this function, and before the optimization, this function accounts for most of the time spend in bulk_add_subscriptions. There's a lot less memory to allocate. I didn't measure the memory difference. When we test-deployed this to chat.zulip.org, we got about a 6x speedup. --- zerver/lib/actions.py | 54 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/zerver/lib/actions.py b/zerver/lib/actions.py index 6a02e5cb31..dcc1a8ab0d 100644 --- a/zerver/lib/actions.py +++ b/zerver/lib/actions.py @@ -1951,16 +1951,52 @@ def get_peer_user_ids_for_stream_change(stream, altered_user_ids, subscribed_use # structure to stay up-to-date. return set(active_user_ids(stream.realm_id)) - set(altered_user_ids) -def query_all_subs_by_stream(streams): - # type: (Iterable[Stream]) -> Dict[int, List[UserProfile]] - all_subs = Subscription.objects.filter(recipient__type=Recipient.STREAM, - recipient__type_id__in=[stream.id for stream in streams], - user_profile__is_active=True, - active=True).select_related('recipient', 'user_profile') +class UserLite(object): + ''' + This is a lightweight object that we use for highly + optimized codepaths that sometimes process ~30k subscription + rows when bulk-adding streams for newly registered users. + + This little wrapper is a lot less expensive than full-blown + UserProfile objects, but it lets the rest of the code work + with the nice object syntax. + + Long term, we want to avoid sending around all of these emails, + so we can make this class go away and just deal with user_ids. + ''' + def __init__(self, user_id, email): + # type: (int, Text) -> None + self.id = user_id + self.email = email + +def query_all_subs_by_stream(streams): + # type: (Iterable[Stream]) -> Dict[int, List[UserLite]] + all_subs = Subscription.objects.filter( + recipient__type=Recipient.STREAM, + recipient__type_id__in=[stream.id for stream in streams], + user_profile__is_active=True, + active=True + ).values( + 'recipient__type_id', + 'user_profile_id', + 'user_profile__email', + ).order_by( + 'recipient__type_id', + ) + + get_stream_id = itemgetter('recipient__type_id') + + all_subs_by_stream = defaultdict(list) # type: Dict[int, List[UserLite]] + for stream_id, rows in itertools.groupby(all_subs, get_stream_id): + users = [ + UserLite( + user_id=row['user_profile_id'], + email=row['user_profile__email'], + ) + for row in rows + ] + all_subs_by_stream[stream_id] = users - all_subs_by_stream = defaultdict(list) # type: Dict[int, List[UserProfile]] - for sub in all_subs: - all_subs_by_stream[sub.recipient.type_id].append(sub.user_profile) return all_subs_by_stream def bulk_add_subscriptions(streams, users, from_stream_creation=False, acting_user=None):