From 610cf6c3713e414995ea1a57110db400ccb88dd2 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Sat, 8 Jul 2023 20:16:48 +0200 Subject: [PATCH] Fix trend calculation working on too many items at a time (#25835) --- app/models/trends/links.rb | 26 +++++++++++++++++++------- app/models/trends/statuses.rb | 26 +++++++++++++++++++------- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/app/models/trends/links.rb b/app/models/trends/links.rb index c94f7c0237..fcbdb1a5f5 100644 --- a/app/models/trends/links.rb +++ b/app/models/trends/links.rb @@ -3,6 +3,8 @@ class Trends::Links < Trends::Base PREFIX = 'trending_links' + BATCH_SIZE = 100 + self.default_options = { threshold: 5, review_threshold: 3, @@ -67,8 +69,21 @@ class Trends::Links < Trends::Base end def refresh(at_time = Time.now.utc) - preview_cards = PreviewCard.where(id: (recently_used_ids(at_time) + PreviewCardTrend.pluck(:preview_card_id)).uniq) - calculate_scores(preview_cards, at_time) + # First, recalculate scores for links that were trending previously. We split the queries + # to avoid having to load all of the IDs into Ruby just to send them back into Postgres + PreviewCard.where(id: PreviewCardTrend.select(:preview_card_id)).find_in_batches(batch_size: BATCH_SIZE) do |preview_cards| + calculate_scores(preview_cards, at_time) + end + + # Then, calculate scores for links that were used today. There are potentially some + # duplicate items here that we might process one more time, but that should be fine + PreviewCard.where(id: recently_used_ids(at_time)).find_in_batches(batch_size: BATCH_SIZE) do |preview_cards| + calculate_scores(preview_cards, at_time) + end + + # Now that all trends have up-to-date scores, and all the ones below the threshold have + # been removed, we can recalculate their positions + PreviewCardTrend.connection.exec_update('UPDATE preview_card_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM preview_card_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE preview_card_trends.id = t0.id') end def request_review @@ -139,10 +154,7 @@ class Trends::Links < Trends::Base to_insert = items.filter { |(score, _)| score >= options[:decay_threshold] } to_delete = items.filter { |(score, _)| score < options[:decay_threshold] } - PreviewCardTrend.transaction do - PreviewCardTrend.upsert_all(to_insert.map { |(score, preview_card)| { preview_card_id: preview_card.id, score: score, language: preview_card.language, allowed: preview_card.trendable? || false } }, unique_by: :preview_card_id) if to_insert.any? - PreviewCardTrend.where(preview_card_id: to_delete.map { |(_, preview_card)| preview_card.id }).delete_all if to_delete.any? - PreviewCardTrend.connection.exec_update('UPDATE preview_card_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM preview_card_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE preview_card_trends.id = t0.id') - end + PreviewCardTrend.upsert_all(to_insert.map { |(score, preview_card)| { preview_card_id: preview_card.id, score: score, language: preview_card.language, allowed: preview_card.trendable? || false } }, unique_by: :preview_card_id) if to_insert.any? + PreviewCardTrend.where(preview_card_id: to_delete.map { |(_, preview_card)| preview_card.id }).delete_all if to_delete.any? end end diff --git a/app/models/trends/statuses.rb b/app/models/trends/statuses.rb index 84bff9c027..5cd352a6f2 100644 --- a/app/models/trends/statuses.rb +++ b/app/models/trends/statuses.rb @@ -3,6 +3,8 @@ class Trends::Statuses < Trends::Base PREFIX = 'trending_statuses' + BATCH_SIZE = 100 + self.default_options = { threshold: 5, review_threshold: 3, @@ -58,8 +60,21 @@ class Trends::Statuses < Trends::Base end def refresh(at_time = Time.now.utc) - statuses = Status.where(id: (recently_used_ids(at_time) + StatusTrend.pluck(:status_id)).uniq).includes(:status_stat, :account) - calculate_scores(statuses, at_time) + # First, recalculate scores for statuses that were trending previously. We split the queries + # to avoid having to load all of the IDs into Ruby just to send them back into Postgres + Status.where(id: StatusTrend.select(:status_id)).includes(:status_stat, :account).find_in_batches(batch_size: BATCH_SIZE) do |statuses| + calculate_scores(statuses, at_time) + end + + # Then, calculate scores for statuses that were used today. There are potentially some + # duplicate items here that we might process one more time, but that should be fine + Status.where(id: recently_used_ids(at_time)).includes(:status_stat, :account).find_in_batches(batch_size: BATCH_SIZE) do |statuses| + calculate_scores(statuses, at_time) + end + + # Now that all trends have up-to-date scores, and all the ones below the threshold have + # been removed, we can recalculate their positions + StatusTrend.connection.exec_update('UPDATE status_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM status_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE status_trends.id = t0.id') end def request_review @@ -117,10 +132,7 @@ class Trends::Statuses < Trends::Base to_insert = items.filter { |(score, _)| score >= options[:decay_threshold] } to_delete = items.filter { |(score, _)| score < options[:decay_threshold] } - StatusTrend.transaction do - StatusTrend.upsert_all(to_insert.map { |(score, status)| { status_id: status.id, account_id: status.account_id, score: score, language: status.language, allowed: status.trendable? || false } }, unique_by: :status_id) if to_insert.any? - StatusTrend.where(status_id: to_delete.map { |(_, status)| status.id }).delete_all if to_delete.any? - StatusTrend.connection.exec_update('UPDATE status_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM status_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE status_trends.id = t0.id') - end + StatusTrend.upsert_all(to_insert.map { |(score, status)| { status_id: status.id, account_id: status.account_id, score: score, language: status.language, allowed: status.trendable? || false } }, unique_by: :status_id) if to_insert.any? + StatusTrend.where(status_id: to_delete.map { |(_, status)| status.id }).delete_all if to_delete.any? end end