From e1acd7b9748ba30ef9bfbb96aa73b3d3382259cf Mon Sep 17 00:00:00 2001 From: Alex Vandiver Date: Thu, 11 Jan 2024 21:49:18 +0000 Subject: [PATCH] process_queue: For threaded workers, create them when they start. Creating the QueueProcessingWorker objects when the ThreadedWorker is created can lead to a race which caused confusing error messages: 1. A thread tries to call `self.worker = get_worker()` 2. This call raises an exception, which is caught by `log_and_exit_if_exception` 3. `log_and_exit_if_exception` sends our process a SIGUSR1, _but otherwise swallows the error_. 4. The thread's `.run()` is called, which tries to access `self.worker`, which was never set, and throws another exception. 5. The process handles the SIGUSR1, restarting. Move the creation of the worker to when it is started, so the worker object does not need to be stored, and possibly have a decoupled failure. --- zerver/management/commands/process_queue.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/zerver/management/commands/process_queue.py b/zerver/management/commands/process_queue.py index c10642180a..3f643f6d69 100644 --- a/zerver/management/commands/process_queue.py +++ b/zerver/management/commands/process_queue.py @@ -119,15 +119,13 @@ class ThreadedWorker(threading.Thread): self.logger = logger self.queue_name = queue_name - with log_and_exit_if_exception(logger, queue_name, threaded=True): - self.worker = get_worker(queue_name, threaded=True) - @override def run(self) -> None: with configure_scope() as scope, log_and_exit_if_exception( self.logger, self.queue_name, threaded=True ): - scope.set_tag("queue_worker", self.worker.queue_name) - self.worker.setup() - logging.debug("starting consuming %s", self.worker.queue_name) - self.worker.start() + scope.set_tag("queue_worker", self.queue_name) + worker = get_worker(self.queue_name, threaded=True) + worker.setup() + logging.debug("starting consuming %s", self.queue_name) + worker.start()