diff --git a/zilencer/management/commands/populate_db.py b/zilencer/management/commands/populate_db.py index 890ec3c656..3270bed3d5 100644 --- a/zilencer/management/commands/populate_db.py +++ b/zilencer/management/commands/populate_db.py @@ -5,29 +5,18 @@ from __future__ import print_function from django.core.management.base import BaseCommand, CommandParser from django.utils.timezone import now -from django.contrib.sites.models import Site -from zerver.models import Message, UserProfile, Stream, Recipient, Client, \ - Subscription, Huddle, get_huddle, Realm, UserMessage, RealmAlias, \ - get_huddle_hash, clear_database, get_client, get_user_profile_by_id, \ - email_to_domain, email_to_username -from zerver.lib.actions import STREAM_ASSIGNMENT_COLORS, do_send_message, set_default_streams, \ - do_activate_user, do_deactivate_user, do_change_password, do_change_is_admin,\ - do_change_bot_type -from zerver.lib.parallel import run_parallel -from django.db.models import Count +from zerver.models import Message, UserProfile, Stream, Recipient, \ + Subscription, get_huddle, Realm, UserMessage, RealmAlias, \ + clear_database, get_client, get_user_profile_by_id, \ + email_to_username +from zerver.lib.actions import STREAM_ASSIGNMENT_COLORS, do_send_message, \ + do_change_is_admin from django.conf import settings -from zerver.lib.bulk_create import bulk_create_realms, \ - bulk_create_streams, bulk_create_users, bulk_create_huddles, \ - bulk_create_clients -from zerver.lib.timestamp import timestamp_to_datetime -from zerver.models import MAX_MESSAGE_LENGTH +from zerver.lib.bulk_create import bulk_create_streams, bulk_create_users from zerver.models import DefaultStream, get_stream, get_realm from zilencer.models import Deployment -import ujson -import datetime import random -import glob import os from optparse import make_option from six import text_type @@ -116,12 +105,6 @@ class Command(BaseCommand): action="store_true", help='Whether to delete all the existing messages.') - parser.add_argument('--replay-old-messages', - action="store_true", - default=False, - dest='replay_old_messages', - help='Whether to replace the log of old messages.') - def handle(self, **options): # type: (**Any) -> None if options["percent_huddles"] + options["percent_personals"] > 100: @@ -293,8 +276,6 @@ class Command(BaseCommand): UserMessage.objects.all().update(flags=UserMessage.flags.read) self.stdout.write("Successfully populated test database.\n") - if options["replay_old_messages"]: - restore_saved_messages() recipient_hash = {} # type: Dict[int, Recipient] def get_recipient_by_id(rid): @@ -303,456 +284,6 @@ def get_recipient_by_id(rid): return recipient_hash[rid] return Recipient.objects.get(id=rid) -def restore_saved_messages(): - # type: () -> None - old_messages = [] # type: List[Dict[str, Any]] - duplicate_suppression_hash = {} # type: Dict[str, bool] - - stream_dict = {} # type: Dict[Tuple[text_type, text_type], Tuple[text_type, text_type]] - user_set = set() # type: Set[Tuple[text_type, text_type, text_type, bool]] - email_set = set([u.email for u in UserProfile.objects.all()]) # type: Set[text_type] - realm_set = set() # type: Set[text_type] - # Initial client_set is nonempty temporarily because we don't have - # clients in logs at all right now -- later we can start with nothing. - client_set = set(["populate_db", "website", "zephyr_mirror"]) - huddle_user_set = set() # type: Set[Tuple[text_type, ...]] - # First, determine all the objects our messages will need. - print(datetime.datetime.now(), "Creating realms/streams/etc...") - def process_line(line): - # type: (str) -> None - old_message_json = line.strip() - - # Due to populate_db's shakespeare mode, we have a lot of - # duplicate messages in our log that only differ in their - # logged ID numbers (same timestamp, content, etc.). With - # sqlite, bulk creating those messages won't work properly: in - # particular, the first 100 messages will actually only result - # in 20 rows ending up in the target table, which screws up - # the below accounting where for handling changing - # subscriptions, we assume that the Nth row populate_db - # created goes with the Nth non-subscription row of the input - # So suppress the duplicates when using sqlite. - if "sqlite" in settings.DATABASES["default"]["ENGINE"]: - tmp_message = ujson.loads(old_message_json) - tmp_message['id'] = '1' - duplicate_suppression_key = ujson.dumps(tmp_message) - if duplicate_suppression_key in duplicate_suppression_hash: - return - duplicate_suppression_hash[duplicate_suppression_key] = True - - old_message = ujson.loads(old_message_json) - message_type = old_message["type"] - - # Lower case emails and domains; it will screw up - # deduplication if we don't - def fix_email(email): - # type: (text_type) -> text_type - return email.strip().lower() - - if message_type in ["stream", "huddle", "personal"]: - old_message["sender_email"] = fix_email(old_message["sender_email"]) - # Fix the length on too-long messages before we start processing them - if len(old_message["content"]) > MAX_MESSAGE_LENGTH: - old_message["content"] = "[ This message was deleted because it was too long ]" - if message_type in ["subscription_added", "subscription_removed"]: - old_message["domain"] = old_message["domain"].lower() - old_message["user"] = fix_email(old_message["user"]) - elif message_type == "subscription_property": - old_message["user"] = fix_email(old_message["user"]) - elif message_type == "user_email_changed": - old_message["old_email"] = fix_email(old_message["old_email"]) - old_message["new_email"] = fix_email(old_message["new_email"]) - elif message_type.startswith("user_"): - old_message["user"] = fix_email(old_message["user"]) - elif message_type.startswith("enable_"): - old_message["user"] = fix_email(old_message["user"]) - - if message_type == 'personal': - old_message["recipient"][0]["email"] = fix_email(old_message["recipient"][0]["email"]) - elif message_type == "huddle": - for i in range(len(old_message["recipient"])): - old_message["recipient"][i]["email"] = fix_email(old_message["recipient"][i]["email"]) - - old_messages.append(old_message) - - if message_type in ["subscription_added", "subscription_removed"]: - stream_name = old_message["name"].strip() # type: text_type - canon_stream_name = stream_name.lower() - if canon_stream_name not in stream_dict: - stream_dict[(old_message["domain"], canon_stream_name)] = \ - (old_message["domain"], stream_name) - elif message_type == "user_created": - user_set.add((old_message["user"], old_message["full_name"], old_message["short_name"], False)) - elif message_type == "realm_created": - realm_set.add(old_message["domain"]) - - if message_type not in ["stream", "huddle", "personal"]: - return - - sender_email = old_message["sender_email"] - - domain = email_to_domain(sender_email) - realm_set.add(domain) - - if old_message["sender_email"] not in email_set: - user_set.add((old_message["sender_email"], - old_message["sender_full_name"], - old_message["sender_short_name"], - False)) - - if 'sending_client' in old_message: - client_set.add(old_message['sending_client']) - - if message_type == 'stream': - stream_name = old_message["recipient"].strip() - canon_stream_name = stream_name.lower() - if canon_stream_name not in stream_dict: - stream_dict[(domain, canon_stream_name)] = (domain, stream_name) - elif message_type == 'personal': - u = old_message["recipient"][0] - if u["email"] not in email_set: - user_set.add((u["email"], u["full_name"], u["short_name"], False)) - email_set.add(u["email"]) - elif message_type == 'huddle': - for u in old_message["recipient"]: - user_set.add((u["email"], u["full_name"], u["short_name"], False)) - if u["email"] not in email_set: - user_set.add((u["email"], u["full_name"], u["short_name"], False)) - email_set.add(u["email"]) - huddle_user_set.add(tuple(sorted(set(u["email"] for u in old_message["recipient"])))) - else: - raise ValueError('Bad message type') - - event_glob = os.path.join(settings.EVENT_LOG_DIR, 'events.*') - for filename in sorted(glob.glob(event_glob)): - with open(filename, "r") as message_log: - for line in message_log.readlines(): - process_line(line) - - stream_recipients = {} # type: Dict[Tuple[int, text_type], Recipient] - user_recipients = {} # type: Dict[text_type, Recipient] - huddle_recipients = {} # type: Dict[text_type, Recipient] - - # Then, create the objects our messages need. - print(datetime.datetime.now(), "Creating realms...") - bulk_create_realms(realm_set) - - realms = {} # type: Dict[text_type, Realm] - for realm in Realm.objects.all(): - realms[realm.domain] = realm - - print(datetime.datetime.now(), "Creating clients...") - bulk_create_clients(client_set) - - clients = {} # type: Dict[text_type, Client] - for client in Client.objects.all(): - clients[client.name] = client - - print(datetime.datetime.now(), "Creating streams...") - bulk_create_streams(realms, list(stream_dict.values())) - - streams = {} # type: Dict[int, Stream] - for stream in Stream.objects.all(): - streams[stream.id] = stream - for recipient in Recipient.objects.filter(type=Recipient.STREAM): - stream_recipients[(streams[recipient.type_id].realm_id, - streams[recipient.type_id].name.lower())] = recipient - - print(datetime.datetime.now(), "Creating users...") - bulk_create_users(realms, user_set, tos_version=settings.TOS_VERSION) - - users = {} # type: Dict[text_type, UserProfile] - users_by_id = {} # type: Dict[int, UserProfile] - for user_profile in UserProfile.objects.select_related().all(): - users[user_profile.email] = user_profile - users_by_id[user_profile.id] = user_profile - for recipient in Recipient.objects.filter(type=Recipient.PERSONAL): - user_recipients[users_by_id[recipient.type_id].email] = recipient - - print(datetime.datetime.now(), "Creating huddles...") - bulk_create_huddles(users, huddle_user_set) - - huddles_by_id = {} # type: Dict[int, Huddle] - for huddle in Huddle.objects.all(): - huddles_by_id[huddle.id] = huddle - for recipient in Recipient.objects.filter(type=Recipient.HUDDLE): - huddle_recipients[huddles_by_id[recipient.type_id].huddle_hash] = recipient - - # TODO: Add a special entry type in the log that is a subscription - # change and import those as we go to make subscription changes - # take effect! - print(datetime.datetime.now(), "Importing subscriptions...") - subscribers = {} # type: Dict[int, Set[int]] - for s in Subscription.objects.select_related().all(): - if s.active: - subscribers.setdefault(s.recipient.id, set()).add(s.user_profile.id) - - # Then create all the messages, without talking to the DB! - print(datetime.datetime.now(), "Importing messages, part 1...") - first_message_id = None - if Message.objects.exists(): - first_message_id = Message.objects.all().order_by("-id")[0].id + 1 - - messages_to_create = [] # type: List[Message] - for idx, old_message in enumerate(old_messages): - message_type = old_message["type"] - if message_type not in ["stream", "huddle", "personal"]: - continue - - message = Message() - - sender_email = old_message["sender_email"] - domain = email_to_domain(sender_email) - realm = realms[domain] - - message.sender = users[sender_email] - type_hash = {"stream": Recipient.STREAM, - "huddle": Recipient.HUDDLE, - "personal": Recipient.PERSONAL} - - if 'sending_client' in old_message: - message.sending_client = clients[old_message['sending_client']] - elif sender_email in ["othello@zulip.com", "iago@zulip.com", "prospero@zulip.com", - "cordelia@zulip.com", "hamlet@zulip.com"]: - message.sending_client = clients['populate_db'] - elif realm.domain == "zulip.com": - message.sending_client = clients["website"] - elif realm.domain == "mit.edu": - message.sending_client = clients['zephyr_mirror'] - else: - message.sending_client = clients['populate_db'] - - message.type = type_hash[message_type] - message.content = old_message["content"] - message.subject = old_message["subject"] - message.pub_date = timestamp_to_datetime(old_message["timestamp"]) - - if message.type == Recipient.PERSONAL: - message.recipient = user_recipients[old_message["recipient"][0]["email"]] - elif message.type == Recipient.STREAM: - message.recipient = stream_recipients[(realm.id, - old_message["recipient"].lower())] - elif message.type == Recipient.HUDDLE: - huddle_hash = get_huddle_hash([users[u["email"]].id - for u in old_message["recipient"]]) - message.recipient = huddle_recipients[huddle_hash] - else: - raise ValueError('Bad message type') - messages_to_create.append(message) - - print(datetime.datetime.now(), "Importing messages, part 2...") - Message.objects.bulk_create(messages_to_create) - messages_to_create = [] - - # Finally, create all the UserMessage objects - print(datetime.datetime.now(), "Importing usermessages, part 1...") - personal_recipients = {} # type: Dict[int, bool] - for r in Recipient.objects.filter(type = Recipient.PERSONAL): - personal_recipients[r.id] = True - - all_messages = Message.objects.all() # type: Sequence[Message] - user_messages_to_create = [] # type: List[UserMessage] - - messages_by_id = {} # type: Dict[int, Message] - for message in all_messages: - messages_by_id[message.id] = message - - if len(messages_by_id) == 0: - print(datetime.datetime.now(), "No old messages to replay") - return - - if first_message_id is None: - first_message_id = min(messages_by_id.keys()) - - tot_user_messages = 0 - pending_subs = {} # type: Dict[Tuple[int, int], bool] - current_message_id = first_message_id - pending_colors = {} # type: Dict[Tuple[text_type, text_type], text_type] - for old_message in old_messages: - message_type = old_message["type"] - if message_type == 'subscription_added': - stream_key = (realms[old_message["domain"]].id, old_message["name"].strip().lower()) - subscribers.setdefault(stream_recipients[stream_key].id, - set()).add(users[old_message["user"]].id) - pending_subs[(stream_recipients[stream_key].id, - users[old_message["user"]].id)] = True - continue - elif message_type == "subscription_removed": - stream_key = (realms[old_message["domain"]].id, old_message["name"].strip().lower()) - user_id = users[old_message["user"]].id - subscribers.setdefault(stream_recipients[stream_key].id, set()) - try: - subscribers[stream_recipients[stream_key].id].remove(user_id) - except KeyError: - print("Error unsubscribing %s from %s: not subscribed" % ( - old_message["user"], old_message["name"])) - pending_subs[(stream_recipients[stream_key].id, - users[old_message["user"]].id)] = False - continue - elif message_type == "user_activated" or message_type == "user_created": - # These are rare, so just handle them the slow way - user_profile = users[old_message["user"]] - join_date = timestamp_to_datetime(old_message['timestamp']) - do_activate_user(user_profile, log=False, join_date=join_date) - # Update the cache of users to show this user as activated - users_by_id[user_profile.id] = user_profile - users[old_message["user"]] = user_profile - continue - elif message_type == "user_deactivated": - user_profile = users[old_message["user"]] - do_deactivate_user(user_profile, log=False) - continue - elif message_type == "user_change_password": - # Just handle these the slow way - user_profile = users[old_message["user"]] - do_change_password(user_profile, old_message["pwhash"], log=False, - hashed_password=True) - continue - elif message_type == "user_change_full_name": - # Just handle these the slow way - user_profile = users[old_message["user"]] - user_profile.full_name = old_message["full_name"] - user_profile.save(update_fields=["full_name"]) - continue - elif message_type == "enable_desktop_notifications_changed": - # Just handle these the slow way - user_profile = users[old_message["user"]] - user_profile.enable_desktop_notifications = (old_message["enable_desktop_notifications"] != "false") - user_profile.save(update_fields=["enable_desktop_notifications"]) - continue - elif message_type == "enable_sounds_changed": - user_profile = users[old_message["user"]] - user_profile.enable_sounds = (old_message["enable_sounds"] != "false") - user_profile.save(update_fields=["enable_sounds"]) - elif message_type == "enable_offline_email_notifications_changed": - user_profile = users[old_message["user"]] - user_profile.enable_offline_email_notifications = ( - old_message["enable_offline_email_notifications"] != "false") - user_profile.save(update_fields=["enable_offline_email_notifications"]) - continue - elif message_type == "enable_offline_push_notifications_changed": - user_profile = users[old_message["user"]] - user_profile.enable_offline_push_notifications = ( - old_message["enable_offline_push_notifications"] != "false") - user_profile.save(update_fields=["enable_offline_push_notifications"]) - continue - elif message_type == "enable_online_push_notifications_changed": - user_profile = users[old_message["user"]] - user_profile.enable_online_push_notifications_changed = ( - old_message["enable_online_push_notifications_changed"] != "false") - user_profile.save(update_fields=["enable_online_push_notifications_changed"]) - continue - elif message_type == "default_streams": - set_default_streams(get_realm(old_message["domain"]), - old_message["streams"]) - continue - elif message_type == "subscription_property": - property_name = old_message.get("property") - if property_name == "stream_color" or property_name == "color": - color = old_message.get("color", old_message.get("value")) - pending_colors[(old_message["user"], - old_message["stream_name"].lower())] = color - elif property_name in ["in_home_view", "notifications"]: - # TODO: Handle this - continue - else: - raise RuntimeError("Unknown property %s" % (property_name,)) - continue - elif message_type == "realm_created": - # No action required - continue - elif message_type in ["user_email_changed", "update_onboarding", "update_message"]: - # TODO: Handle these - continue - if message_type not in ["stream", "huddle", "personal"]: - raise RuntimeError("Unexpected message type %s" % (message_type,)) - - message = messages_by_id[current_message_id] - current_message_id += 1 - - if message.recipient_id not in subscribers: - # Nobody received this message -- probably due to our - # subscriptions being out-of-date. - continue - - recipient_user_ids = set() # type: Set[int] - for user_profile_id in subscribers[message.recipient_id]: - recipient_user_ids.add(user_profile_id) - if message.recipient_id in personal_recipients: - # Include the sender in huddle recipients - recipient_user_ids.add(message.sender_id) - - for user_profile_id in recipient_user_ids: - if users_by_id[user_profile_id].is_active: - um = UserMessage(user_profile_id=user_profile_id, - message=message) - user_messages_to_create.append(um) - - if len(user_messages_to_create) > 100000: - tot_user_messages += len(user_messages_to_create) - UserMessage.objects.bulk_create(user_messages_to_create) - user_messages_to_create = [] - - print(datetime.datetime.now(), "Importing usermessages, part 2...") - tot_user_messages += len(user_messages_to_create) - UserMessage.objects.bulk_create(user_messages_to_create) - - print(datetime.datetime.now(), "Finalizing subscriptions...") - current_subs = {} # type: Dict[Tuple[int, int], bool] - current_subs_obj = {} # type: Dict[Tuple[int, int], Subscription] - for s in Subscription.objects.select_related().all(): - current_subs[(s.recipient_id, s.user_profile_id)] = s.active - current_subs_obj[(s.recipient_id, s.user_profile_id)] = s - - subscriptions_to_add = [] # type: List[Subscription] - subscriptions_to_change = [] # type: List[Tuple[Tuple[int, int], bool]] - for pending_sub in pending_subs.keys(): - (recipient_id, user_profile_id) = pending_sub - current_state = current_subs.get(pending_sub) - if pending_subs[pending_sub] == current_state: - # Already correct in the database - continue - elif current_state is not None: - subscriptions_to_change.append((pending_sub, pending_subs[pending_sub])) - continue - - s = Subscription(recipient_id=recipient_id, - user_profile_id=user_profile_id, - active=pending_subs[pending_sub]) - subscriptions_to_add.append(s) - Subscription.objects.bulk_create(subscriptions_to_add) - for (sub_tuple, active) in subscriptions_to_change: - current_subs_obj[sub_tuple].active = active - current_subs_obj[sub_tuple].save(update_fields=["active"]) - - subs = {} # type: Dict[Tuple[int, int], Subscription] - for sub in Subscription.objects.all(): - subs[(sub.user_profile_id, sub.recipient_id)] = sub - - # TODO: do restore of subscription colors -- we're currently not - # logging changes so there's little point in having the code :( - - print(datetime.datetime.now(), "Finished importing %s messages (%s usermessages)" % \ - (len(all_messages), tot_user_messages)) - - site = Site.objects.get_current() - site.domain = 'zulip.com' - site.save() - - print(datetime.datetime.now(), "Filling in user pointers...") - - # Set restored pointers to the very latest messages - for user_profile in UserProfile.objects.all(): - try: - top = UserMessage.objects.filter( - user_profile_id=user_profile.id).order_by("-message")[0] - user_profile.pointer = top.message_id - except IndexError: - user_profile.pointer = -1 - user_profile.save(update_fields=["pointer"]) - - print(datetime.datetime.now(), "Done replaying old messages") - # Create some test messages, including: # - multiple streams # - multiple subjects per stream