From ca2ca030d2fb485e4ad052c70b28ca179fccc2b8 Mon Sep 17 00:00:00 2001 From: Alex Vandiver Date: Wed, 19 Apr 2023 19:28:58 +0000 Subject: [PATCH] migrations: Backfill missing RealmAuditLog entries for subscriptions. Backfill subscription realm audit log SUBSCRIPTION_CREATED events for users which are currently subscribed but don't have any subscription events, presumably due to some historical bug. This is important because those rows are necessary when reactivating a user who is currently soft-deactivated. For each stream, we find the subscribed users who have no subscription-related realm audit log entries, and create a `backfill=True` subscription audit log entry which is the latest it could have been, based on UserMessage rows. We then optionally insert a `DEACTIVATION` if the current subscription is not active. --- zerver/lib/soft_deactivation.py | 4 +- .../0450_backfill_subscription_auditlogs.py | 151 ++++++++++++++++++ 2 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 zerver/migrations/0450_backfill_subscription_auditlogs.py diff --git a/zerver/lib/soft_deactivation.py b/zerver/lib/soft_deactivation.py index 8e1ef3ad58..0b28641fa8 100644 --- a/zerver/lib/soft_deactivation.py +++ b/zerver/lib/soft_deactivation.py @@ -178,7 +178,9 @@ def add_missing_messages(user_profile: UserProfile) -> None: # That second tiebreak is important in case a user is subscribed # and then unsubscribed without any messages being sent in the # meantime. Without that tiebreak, we could end up incorrectly - # processing the ordering of those two subscription changes. + # processing the ordering of those two subscription changes. Note + # that this means we cannot backfill events unless there are no + # pre-existing events for this stream/user pair! subscription_logs = list( RealmAuditLog.objects.filter( modified_user=user_profile, modified_stream_id__in=stream_ids, event_type__in=events diff --git a/zerver/migrations/0450_backfill_subscription_auditlogs.py b/zerver/migrations/0450_backfill_subscription_auditlogs.py new file mode 100644 index 0000000000..26807cfe00 --- /dev/null +++ b/zerver/migrations/0450_backfill_subscription_auditlogs.py @@ -0,0 +1,151 @@ +# Generated by Django 4.2 on 2023-04-19 18:18 + + +from django.db import migrations, transaction +from django.db.backends.base.schema import BaseDatabaseSchemaEditor +from django.db.migrations.state import StateApps +from django.db.models import Max, Min +from django.utils.timezone import now as timezone_now + + +def backfill_missing_subscriptions( + apps: StateApps, schema_editor: BaseDatabaseSchemaEditor +) -> None: + """Backfill subscription realm audit log events for users which are + currently subscribed but don't have any, presumably due to some + historical bug. This is important because those rows are + necessary when reactivating a user who is currently + soft-deactivated. + + For each stream, we find the subscribed users who have no relevant + realm audit log entries, and create a backfill=True subscription + audit log entry which is the latest it could have been, based on + UserMessage rows. + + """ + + Stream = apps.get_model("zerver", "Stream") + RealmAuditLog = apps.get_model("zerver", "RealmAuditLog") + Subscription = apps.get_model("zerver", "Subscription") + UserMessage = apps.get_model("zerver", "UserMessage") + Message = apps.get_model("zerver", "Message") + + def get_last_message_id() -> int: + # We generally use this function to populate RealmAuditLog, and + # the max id here is actually system-wide, not per-realm. I + # assume there's some advantage in not filtering by realm. + last_id = Message.objects.aggregate(Max("id"))["id__max"] + if last_id is None: + # During initial realm creation, there might be 0 messages in + # the database; in that case, the `aggregate` query returns + # None. Since we want an int for "beginning of time", use -1. + last_id = -1 + return last_id + + for stream in Stream.objects.all(): + with transaction.atomic(): + subscribed_user_ids = set( + Subscription.objects.filter(recipient_id=stream.recipient_id).values_list( + "user_profile_id", flat=True + ) + ) + user_ids_in_audit_log = set( + RealmAuditLog.objects.filter( + realm=stream.realm, + event_type__in=[ + 301, # RealmAuditLog.SUBSCRIPTION_CREATED + 302, # RealmAuditLog.SUBSCRIPTION_ACTIVATED + 303, # RealmAuditLog.SUBSCRIPTION_DEACTIVATED + ], + modified_stream=stream, + ) + .distinct("modified_user_id") + .values_list("modified_user_id", flat=True) + ) + + user_ids_missing_events = subscribed_user_ids - user_ids_in_audit_log + if not user_ids_missing_events: + continue + + last_message_id = get_last_message_id() + now = timezone_now() + backfills = [] + for user_id in sorted(user_ids_missing_events): + print( + f"Backfilling subscription event for {user_id} in stream {stream.id} in realm {stream.realm.string_id}" + ) + aggregated = UserMessage.objects.filter( + user_profile_id=user_id, + message__recipient=stream.recipient_id, + ).aggregate( + earliest_date=Min("message__date_sent"), + earliest_message_id=Min("message_id"), + latest_date=Max("message__date_sent"), + latest_message_id=Max("message_id"), + ) + + # Assume we subscribed right before the first message we + # saw -- or, if we don't see any, right now. This makes + # this safe for streams which do not have shared history. + if aggregated["earliest_message_id"] is not None: + event_last_message_id = aggregated["earliest_message_id"] - 1 + else: + event_last_message_id = last_message_id + if aggregated["earliest_date"] is not None: + event_time = aggregated["earliest_date"] + else: + event_time = now + log_event = RealmAuditLog( + event_time=event_time, + event_last_message_id=event_last_message_id, + backfilled=True, + event_type=301, # RealmAuditLog.SUBSCRIPTION_CREATED + realm_id=stream.realm_id, + modified_user_id=user_id, + modified_stream_id=stream.id, + ) + backfills.append(log_event) + + # If the subscription is not active, then we also need + # to manufacture a SUBSCRIPTION_DEACTIVATED event, + # which we assume to be whenever the last received + # UserMessage row was. + sub = Subscription.objects.get( + user_profile_id=user_id, recipient_id=stream.recipient_id + ) + if sub.active: + continue + if aggregated["latest_message_id"] is not None: + event_last_message_id = aggregated["latest_message_id"] + else: + event_last_message_id = last_message_id + if aggregated["latest_date"] is not None: + event_time = aggregated["latest_date"] + else: + event_time = now + deactivated_log_event = RealmAuditLog( + event_time=event_time, + event_last_message_id=event_last_message_id, + backfilled=True, + event_type=303, # RealmAuditLog.SUBSCRIPTION_DEACTIVATED + realm_id=stream.realm_id, + modified_user_id=user_id, + modified_stream_id=stream.id, + ) + backfills.append(deactivated_log_event) + + RealmAuditLog.objects.bulk_create(backfills) + + +class Migration(migrations.Migration): + atomic = False + + dependencies = [ + ("zerver", "0449_scheduledmessage_zerver_unsent_scheduled_messages_indexes"), + ] + + operations = [ + migrations.RunPython( + backfill_missing_subscriptions, reverse_code=migrations.RunPython.noop, elidable=True + ) + ]