Skip to content

Commit 2378613

Browse files
authored
CV2-6172: Prevent duplicate Sidekiq jobs (#2435)
* CV2-6172: add enqueue_at date to skip job execution bases on this date * CV2-6172: fix tests * CV2-6172: cleanup * CV2-6172: skip ES for outdated tasks * CV2-6172: fix tests
1 parent f80b1a1 commit 2378613

6 files changed

Lines changed: 25 additions & 31 deletions

File tree

app/lib/check_elastic_search.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ def create_elasticsearch_doc_bg(options)
3030
$repository.refresh_index! if CheckConfig.get('elasticsearch_sync')
3131
end
3232

33-
def update_elasticsearch_doc(keys, data = {}, pm_id = nil, skip_get_data = false)
33+
def update_elasticsearch_doc(keys, data = {}, pm_id = nil, skip_get_data = false, enqueued_at = 0)
3434
return if self.disable_es_callbacks || RequestStore.store[:disable_es_callbacks]
3535
options = { keys: keys, data: data, pm_id: pm_id, skip_get_data: skip_get_data }
3636
model = { klass: self.class.name, id: self.id }
37-
ElasticSearchWorker.perform_in(1.second, YAML::dump(model), YAML::dump(options), 'update_doc')
37+
ElasticSearchWorker.perform_in(1.second, YAML::dump(model), YAML::dump(options), 'update_doc', enqueued_at)
3838
end
3939

4040
def update_recent_activity(obj)

app/models/annotations/dynamic.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def handle_extracted_text(op)
110110
def handle_report_fields
111111
if self.annotated_type == 'ProjectMedia' && self.annotation_type == 'report_design'
112112
data = { 'report_published_at' => self.data['last_published'], 'report_language' => self.report_design_field_value('language') }
113-
self.update_elasticsearch_doc(data.keys, data, self.annotated_id, true)
113+
self.update_elasticsearch_doc(data.keys, data, self.annotated_id, true, Time.now.utc.to_i)
114114
end
115115
end
116116

@@ -139,7 +139,7 @@ def handle_annotated_by(op)
139139
end
140140
uids.uniq!
141141
Rails.cache.write(key, uids)
142-
task.update_elasticsearch_doc(['annotated_by'], { 'annotated_by' => uids }, pm.id, true)
142+
task.update_elasticsearch_doc(['annotated_by'], { 'annotated_by' => uids }, pm.id, true, Time.now.utc.to_i)
143143
end
144144
end
145145
end

app/models/workflow/concerns/dynamic_annotation_field_concern.rb

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,27 +26,7 @@ def index_on_es_background
2626
obj.update_columns(updated_at: updated_at)
2727
data['updated_at'] = updated_at.utc
2828
end
29-
self.update_elasticsearch_doc(data.keys, data, obj.id)
30-
end
31-
end
32-
33-
def index_on_es_foreground
34-
return if self.disable_es_callbacks || RequestStore.store[:disable_es_callbacks]
35-
obj = self&.annotation&.annotated
36-
if !obj.nil? && obj.class.name == 'ProjectMedia'
37-
data = { self.annotation_type => self.value }
38-
if User.current.present?
39-
updated_at = Time.now
40-
obj.update_columns(updated_at: updated_at)
41-
data['updated_at'] = updated_at.utc
42-
end
43-
options = {
44-
keys: data.keys,
45-
data: data,
46-
pm_id: obj.id,
47-
doc_id: Base64.encode64("#{obj.class.name}/#{obj.id}")
48-
}
49-
self.update_elasticsearch_doc_bg(options)
29+
self.update_elasticsearch_doc(data.keys, data, obj.id, false, Time.now.utc.to_i)
5030
end
5131
end
5232

app/models/workflow/verification_status.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ class Workflow::VerificationStatus < Workflow::Base
33
check_default_project_media_workflow
44

55
check_workflow from: :any, to: :any, actions: [:check_if_item_is_published, :apply_rules, :update_report_design_if_needed, :replicate_status_to_children]
6-
check_workflow on: :create, actions: :index_on_es_background
7-
check_workflow on: :update, actions: :index_on_es_foreground
6+
check_workflow on: :save, actions: :index_on_es_background
87

98
def self.core_default_value
109
'undetermined'

app/workers/elastic_search_worker.rb

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class ElasticSearchWorker
66

77
sidekiq_retry_in { |_count, _e| 3 }
88

9-
def perform(model_data, options, type)
9+
def perform(model_data, options, type, enqueued_at = 0)
1010
model_data = begin YAML::load(model_data) rescue nil end
1111
unless model_data.nil?
1212
model = model_data[:klass].constantize.find_by_id model_data[:id]
@@ -15,7 +15,6 @@ def perform(model_data, options, type)
1515
ops = {
1616
'create_doc' => 'create_elasticsearch_doc_bg',
1717
'update_doc' => 'update_elasticsearch_doc_bg',
18-
'update_doc_team' => 'update_elasticsearch_doc_team_bg',
1918
'create_update_doc_nested' => 'create_update_nested_obj_bg',
2019
'destroy_doc' => 'destroy_elasticsearch_doc',
2120
'destroy_doc_nested' => 'destroy_elasticsearch_doc_nested',
@@ -25,7 +24,7 @@ def perform(model_data, options, type)
2524
options[:model_id] = model_data[:id]
2625
model_data[:klass].constantize.send(ops[type],options)
2726
else
28-
model.send(ops[type], options)
27+
model.send(ops[type], options) if enqueued_at == 0 || enqueued_at >= model.updated_at.to_i
2928
end
3029
end
3130
end
@@ -37,7 +36,7 @@ def perform(model_data, options, type)
3736
def should_perform_es_action?(type, options, model, op)
3837
# Verify that object still exists in PG (should skip destroy operation)
3938
action = false
40-
if ['destroy_doc', 'destroy_doc_nested', 'update_doc_team'].include?(type)
39+
if ['destroy_doc', 'destroy_doc_nested'].include?(type)
4140
action = true
4241
elsif !options[:doc_id].blank? && !options[:pm_id].nil?
4342
action = ProjectMedia.exists?(options[:pm_id]) && model.respond_to?(op)

test/controllers/elastic_search_5_test.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ def setup
106106

107107
test "should create update elasticsearch status" do
108108
m = create_valid_media
109+
s = nil
110+
pm = nil
109111
Sidekiq::Testing.inline! do
110112
pm = create_project_media media: m, disable_es_callbacks: false
111113
sleep 2
@@ -119,6 +121,20 @@ def setup
119121
ms = $repository.find(get_es_id(pm))
120122
assert_equal 'verified', ms['verification_status']
121123
end
124+
assert_equal 'verified', s.reload.status
125+
Sidekiq::Testing.fake! do
126+
s.status = 'in_progress'
127+
s.save!
128+
end
129+
sleep 2
130+
Sidekiq::Testing.inline! do
131+
s.status = 'false'
132+
s.save!
133+
end
134+
Sidekiq::Worker.drain_all
135+
sleep 2
136+
ms = $repository.find(get_es_id(pm))
137+
assert_equal 'false', ms['verification_status']
122138
end
123139

124140
test "should create parent if not exists" do

0 commit comments

Comments
 (0)