diff --git a/CHANGELOG.md b/CHANGELOG.md index 7815bb5849..1643049605 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 * [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442 +* [FEATURE] Querier: Add resource-based query eviction that automatically cancels the heaviest running query when CPU or heap utilization exceeds configured thresholds. #7488 * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 24bc6a4c3a..b633ddbc8f 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -330,6 +330,43 @@ querier: # type. 0 to disable. # CLI flag: -querier.query-protection.rejection.threshold.heap-utilization [heap_utilization: | default = 0] + + eviction: + threshold: + # EXPERIMENTAL: Max CPU utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in + # percentage, between 0 and 1. monitored_resources config must include + # the resource type. 0 to disable. + # CLI flag: -querier.query-protection.eviction.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in + # percentage, between 0 and 1. monitored_resources config must include + # the resource type. 0 to disable. + # CLI flag: -querier.query-protection.eviction.threshold.heap-utilization + [heap_utilization: | default = 0] + + # EXPERIMENTAL: How frequently the evictor checks system resource + # utilization. + # CLI flag: -querier.query-protection.eviction.check-interval + [check_interval: | default = 1s] + + # EXPERIMENTAL: Number of check intervals to wait after an eviction before + # evicting again. + # CLI flag: -querier.query-protection.eviction.cooldown-period + [cooldown_period: | default = 3] + + # EXPERIMENTAL: The query metric used to determine the heaviest query for + # eviction. Supported values: fetched_samples, fetched_series, + # fetched_chunks, fetched_chunk_bytes. + # CLI flag: -querier.query-protection.eviction.eviction-metric + [eviction_metric: | default = "fetched_samples"] + + # EXPERIMENTAL: Minimum time a query must be running before it becomes + # eligible for eviction. Queries younger than this are ignored. + # CLI flag: -querier.query-protection.eviction.min-query-age + [min_query_age: | default = 10s] ``` ### `blocks_storage_config` diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 965a9089f2..59250604fa 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -372,6 +372,43 @@ store_gateway: # CLI flag: -store-gateway.query-protection.rejection.threshold.heap-utilization [heap_utilization: | default = 0] + eviction: + threshold: + # EXPERIMENTAL: Max CPU utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in + # percentage, between 0 and 1. monitored_resources config must include + # the resource type. 0 to disable. + # CLI flag: -store-gateway.query-protection.eviction.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in + # percentage, between 0 and 1. monitored_resources config must include + # the resource type. 0 to disable. + # CLI flag: -store-gateway.query-protection.eviction.threshold.heap-utilization + [heap_utilization: | default = 0] + + # EXPERIMENTAL: How frequently the evictor checks system resource + # utilization. + # CLI flag: -store-gateway.query-protection.eviction.check-interval + [check_interval: | default = 1s] + + # EXPERIMENTAL: Number of check intervals to wait after an eviction before + # evicting again. + # CLI flag: -store-gateway.query-protection.eviction.cooldown-period + [cooldown_period: | default = 3] + + # EXPERIMENTAL: The query metric used to determine the heaviest query for + # eviction. Supported values: fetched_samples, fetched_series, + # fetched_chunks, fetched_chunk_bytes. + # CLI flag: -store-gateway.query-protection.eviction.eviction-metric + [eviction_metric: | default = "fetched_samples"] + + # EXPERIMENTAL: Minimum time a query must be running before it becomes + # eligible for eviction. Queries younger than this are ignored. + # CLI flag: -store-gateway.query-protection.eviction.min-query-age + [min_query_age: | default = 10s] + hedged_request: # If true, hedged requests are applied to object store calls. It can help # with reducing tail latency. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f35e99c327..e6ca704464 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3872,6 +3872,43 @@ query_protection: # disable. # CLI flag: -ingester.query-protection.rejection.threshold.heap-utilization [heap_utilization: | default = 0] + + eviction: + threshold: + # EXPERIMENTAL: Max CPU utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -ingester.query-protection.eviction.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -ingester.query-protection.eviction.threshold.heap-utilization + [heap_utilization: | default = 0] + + # EXPERIMENTAL: How frequently the evictor checks system resource + # utilization. + # CLI flag: -ingester.query-protection.eviction.check-interval + [check_interval: | default = 1s] + + # EXPERIMENTAL: Number of check intervals to wait after an eviction before + # evicting again. + # CLI flag: -ingester.query-protection.eviction.cooldown-period + [cooldown_period: | default = 3] + + # EXPERIMENTAL: The query metric used to determine the heaviest query for + # eviction. Supported values: fetched_samples, fetched_series, + # fetched_chunks, fetched_chunk_bytes. + # CLI flag: -ingester.query-protection.eviction.eviction-metric + [eviction_metric: | default = "fetched_samples"] + + # EXPERIMENTAL: Minimum time a query must be running before it becomes + # eligible for eviction. Queries younger than this are ignored. + # CLI flag: -ingester.query-protection.eviction.min-query-age + [min_query_age: | default = 10s] ``` ### `ingester_client_config` @@ -5016,6 +5053,43 @@ query_protection: # disable. # CLI flag: -querier.query-protection.rejection.threshold.heap-utilization [heap_utilization: | default = 0] + + eviction: + threshold: + # EXPERIMENTAL: Max CPU utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -querier.query-protection.eviction.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -querier.query-protection.eviction.threshold.heap-utilization + [heap_utilization: | default = 0] + + # EXPERIMENTAL: How frequently the evictor checks system resource + # utilization. + # CLI flag: -querier.query-protection.eviction.check-interval + [check_interval: | default = 1s] + + # EXPERIMENTAL: Number of check intervals to wait after an eviction before + # evicting again. + # CLI flag: -querier.query-protection.eviction.cooldown-period + [cooldown_period: | default = 3] + + # EXPERIMENTAL: The query metric used to determine the heaviest query for + # eviction. Supported values: fetched_samples, fetched_series, + # fetched_chunks, fetched_chunk_bytes. + # CLI flag: -querier.query-protection.eviction.eviction-metric + [eviction_metric: | default = "fetched_samples"] + + # EXPERIMENTAL: Minimum time a query must be running before it becomes + # eligible for eviction. Queries younger than this are ignored. + # CLI flag: -querier.query-protection.eviction.min-query-age + [min_query_age: | default = 10s] ``` ### `query_frontend_config` @@ -6785,6 +6859,43 @@ query_protection: # CLI flag: -store-gateway.query-protection.rejection.threshold.heap-utilization [heap_utilization: | default = 0] + eviction: + threshold: + # EXPERIMENTAL: Max CPU utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -store-gateway.query-protection.eviction.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -store-gateway.query-protection.eviction.threshold.heap-utilization + [heap_utilization: | default = 0] + + # EXPERIMENTAL: How frequently the evictor checks system resource + # utilization. + # CLI flag: -store-gateway.query-protection.eviction.check-interval + [check_interval: | default = 1s] + + # EXPERIMENTAL: Number of check intervals to wait after an eviction before + # evicting again. + # CLI flag: -store-gateway.query-protection.eviction.cooldown-period + [cooldown_period: | default = 3] + + # EXPERIMENTAL: The query metric used to determine the heaviest query for + # eviction. Supported values: fetched_samples, fetched_series, + # fetched_chunks, fetched_chunk_bytes. + # CLI flag: -store-gateway.query-protection.eviction.eviction-metric + [eviction_metric: | default = "fetched_samples"] + + # EXPERIMENTAL: Minimum time a query must be running before it becomes + # eligible for eviction. Queries younger than this are ignored. + # CLI flag: -store-gateway.query-protection.eviction.min-query-age + [min_query_age: | default = 10s] + hedged_request: # If true, hedged requests are applied to object store calls. It can help with # reducing tail latency. diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index be3ee78ce5..105ab8ece2 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -129,3 +129,10 @@ Currently experimental features are: - `-validation.max-label-cardinality-for-unoptimized-regex` (int) - maximum label cardinality - `-validation.max-total-label-value-length-for-unoptimized-regex` (int) - maximum total length of all label values in bytes - HATracker: `-distributor.ha-tracker.enable-startup-sync` (bool) - If enabled, fetches all tracked keys on startup to populate the local cache. +- Querier: Resource-based query eviction + - `-querier.query-protection.eviction.threshold.cpu-utilization` (float) + - `-querier.query-protection.eviction.threshold.heap-utilization` (float) + - `-querier.query-protection.eviction.check-interval` (duration) + - `-querier.query-protection.eviction.cooldown-period` (int) + - `-querier.query-protection.eviction.eviction-metric` (string) + - `-querier.query-protection.eviction.min-query-age` (duration) diff --git a/pkg/configs/query_protection.go b/pkg/configs/query_protection.go index 7b48e9b2de..e2b1aa2b1a 100644 --- a/pkg/configs/query_protection.go +++ b/pkg/configs/query_protection.go @@ -3,47 +3,115 @@ package configs import ( "errors" "flag" + "fmt" "strings" + "time" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/resource" ) +// recognizedEvictionMetrics lists the valid values for eviction_metric. +var recognizedEvictionMetrics = map[string]bool{ + "fetched_samples": true, + "fetched_series": true, + "fetched_chunks": true, + "fetched_chunk_bytes": true, +} + type QueryProtection struct { - Rejection rejection `json:"rejection"` + Rejection rejection `json:"rejection"` + Eviction EvictionConfig `yaml:"eviction"` } type rejection struct { - Threshold threshold `yaml:"threshold"` + Threshold Threshold `yaml:"threshold"` } -type threshold struct { +// Threshold holds CPU and heap utilization thresholds (0-1 range). +type Threshold struct { CPUUtilization float64 `yaml:"cpu_utilization"` HeapUtilization float64 `yaml:"heap_utilization"` } +// EvictionConfig configures the resource-based query evictor. +type EvictionConfig struct { + Threshold Threshold `yaml:"threshold"` + CheckInterval time.Duration `yaml:"check_interval"` + CooldownPeriod int `yaml:"cooldown_period"` + EvictionMetric string `yaml:"eviction_metric"` + MinQueryAge time.Duration `yaml:"min_query_age"` +} + +// Enabled returns true when at least one eviction threshold is greater than 0. +func (c EvictionConfig) Enabled() bool { + return c.Threshold.CPUUtilization > 0 || c.Threshold.HeapUtilization > 0 +} + func (cfg *QueryProtection) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + // Rejection flags f.Float64Var(&cfg.Rejection.Threshold.CPUUtilization, prefix+"query-protection.rejection.threshold.cpu-utilization", 0, "EXPERIMENTAL: Max CPU utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") f.Float64Var(&cfg.Rejection.Threshold.HeapUtilization, prefix+"query-protection.rejection.threshold.heap-utilization", 0, "EXPERIMENTAL: Max heap utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") + + // Eviction flags + f.Float64Var(&cfg.Eviction.Threshold.CPUUtilization, prefix+"query-protection.eviction.threshold.cpu-utilization", 0, "EXPERIMENTAL: Max CPU utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") + f.Float64Var(&cfg.Eviction.Threshold.HeapUtilization, prefix+"query-protection.eviction.threshold.heap-utilization", 0, "EXPERIMENTAL: Max heap utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") + f.DurationVar(&cfg.Eviction.CheckInterval, prefix+"query-protection.eviction.check-interval", 1*time.Second, "EXPERIMENTAL: How frequently the evictor checks system resource utilization.") + f.IntVar(&cfg.Eviction.CooldownPeriod, prefix+"query-protection.eviction.cooldown-period", 3, "EXPERIMENTAL: Number of check intervals to wait after an eviction before evicting again.") + f.StringVar(&cfg.Eviction.EvictionMetric, prefix+"query-protection.eviction.eviction-metric", "fetched_samples", "EXPERIMENTAL: The query metric used to determine the heaviest query for eviction. Supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes.") + f.DurationVar(&cfg.Eviction.MinQueryAge, prefix+"query-protection.eviction.min-query-age", 10*time.Second, "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.") } func (cfg *QueryProtection) Validate(monitoredResources flagext.StringSliceCSV) error { - thresholdCfg := cfg.Rejection.Threshold - if thresholdCfg.CPUUtilization > 1 || thresholdCfg.CPUUtilization < 0 { + // Validate rejection thresholds + rejThreshold := cfg.Rejection.Threshold + if rejThreshold.CPUUtilization > 1 || rejThreshold.CPUUtilization < 0 { return errors.New("cpu_utilization must be between 0 and 1") } - if thresholdCfg.CPUUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.CPU)) { + if rejThreshold.CPUUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.CPU)) { return errors.New("monitored_resources config must include \"cpu\" as well") } - if thresholdCfg.HeapUtilization > 1 || thresholdCfg.HeapUtilization < 0 { + if rejThreshold.HeapUtilization > 1 || rejThreshold.HeapUtilization < 0 { return errors.New("heap_utilization must be between 0 and 1") } - if thresholdCfg.HeapUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.Heap)) { + if rejThreshold.HeapUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.Heap)) { return errors.New("monitored_resources config must include \"heap\" as well") } + // Validate eviction thresholds + evThreshold := cfg.Eviction.Threshold + if evThreshold.CPUUtilization > 1 || evThreshold.CPUUtilization < 0 { + return errors.New("eviction cpu_utilization must be between 0 and 1") + } + + if evThreshold.HeapUtilization > 1 || evThreshold.HeapUtilization < 0 { + return errors.New("eviction heap_utilization must be between 0 and 1") + } + + if cfg.Eviction.Enabled() { + if cfg.Eviction.CheckInterval <= 0 { + return errors.New("eviction check_interval must be greater than 0 when eviction is enabled") + } + + if cfg.Eviction.CooldownPeriod < 0 { + return errors.New("eviction cooldown_period must be >= 0") + } + + if !recognizedEvictionMetrics[cfg.Eviction.EvictionMetric] { + return fmt.Errorf("unrecognized eviction_metric %q; supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes", cfg.Eviction.EvictionMetric) + } + + if evThreshold.CPUUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.CPU)) { + return errors.New("monitored_resources config must include \"cpu\" when eviction cpu threshold is set") + } + + if evThreshold.HeapUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.Heap)) { + return errors.New("monitored_resources config must include \"heap\" when eviction heap threshold is set") + } + } + return nil } diff --git a/pkg/configs/query_protection_test.go b/pkg/configs/query_protection_test.go index f06b4be7f3..ce92399208 100644 --- a/pkg/configs/query_protection_test.go +++ b/pkg/configs/query_protection_test.go @@ -2,9 +2,14 @@ package configs import ( "errors" + "flag" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util/flagext" ) func Test_Validate(t *testing.T) { @@ -16,7 +21,7 @@ func Test_Validate(t *testing.T) { "correct config should pass validation": { queryProtection: QueryProtection{ Rejection: rejection{ - Threshold: threshold{ + Threshold: Threshold{ CPUUtilization: 0.5, HeapUtilization: 0.5, }, @@ -28,7 +33,7 @@ func Test_Validate(t *testing.T) { "utilization config less than 0 should fail validation": { queryProtection: QueryProtection{ Rejection: rejection{ - Threshold: threshold{ + Threshold: Threshold{ CPUUtilization: -0.5, HeapUtilization: 0.5, }, @@ -40,7 +45,7 @@ func Test_Validate(t *testing.T) { "utilization config greater than 1 should fail validation": { queryProtection: QueryProtection{ Rejection: rejection{ - Threshold: threshold{ + Threshold: Threshold{ CPUUtilization: 0.5, HeapUtilization: 1.5, }, @@ -52,7 +57,7 @@ func Test_Validate(t *testing.T) { "missing cpu in monitored_resources config should fail validation": { queryProtection: QueryProtection{ Rejection: rejection{ - Threshold: threshold{ + Threshold: Threshold{ CPUUtilization: 0.5, }, }, @@ -63,7 +68,7 @@ func Test_Validate(t *testing.T) { "missing heap in monitored_resources config should fail validation": { queryProtection: QueryProtection{ Rejection: rejection{ - Threshold: threshold{ + Threshold: Threshold{ HeapUtilization: 0.5, }, }, @@ -73,7 +78,7 @@ func Test_Validate(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - err := tc.queryProtection.Validate(tc.monitoredResources) + err := tc.queryProtection.Validate(flagext.StringSliceCSV(tc.monitoredResources)) if tc.err != nil { require.Errorf(t, err, tc.err.Error()) } else { @@ -82,3 +87,72 @@ func Test_Validate(t *testing.T) { }) } } + +func Test_EvictionConfig_Enabled(t *testing.T) { + assert.False(t, EvictionConfig{}.Enabled()) + assert.True(t, EvictionConfig{Threshold: Threshold{CPUUtilization: 0.8}}.Enabled()) + assert.True(t, EvictionConfig{Threshold: Threshold{HeapUtilization: 0.85}}.Enabled()) + assert.True(t, EvictionConfig{Threshold: Threshold{CPUUtilization: 0.8, HeapUtilization: 0.85}}.Enabled()) +} + +func Test_EvictionConfig_Validation(t *testing.T) { + validBase := func() QueryProtection { + return QueryProtection{ + Eviction: EvictionConfig{ + Threshold: Threshold{CPUUtilization: 0.8, HeapUtilization: 0.85}, + CheckInterval: 1 * time.Second, + CooldownPeriod: 3, + EvictionMetric: "fetched_samples", + }, + } + } + + tests := map[string]struct { + modify func(*QueryProtection) + monitoredResources []string + err string + }{ + "valid config passes": {func(qp *QueryProtection) {}, []string{"cpu", "heap"}, ""}, + "cpu > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = 1.5 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"}, + "cpu < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = -0.1 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"}, + "heap > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = 2.0 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"}, + "heap < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = -0.5 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"}, + "check_interval 0 fails": {func(qp *QueryProtection) { qp.Eviction.CheckInterval = 0 }, []string{"cpu", "heap"}, "eviction check_interval must be greater than 0 when eviction is enabled"}, + "cooldown < 0 fails": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = -1 }, []string{"cpu", "heap"}, "eviction cooldown_period must be >= 0"}, + "unknown metric fails": {func(qp *QueryProtection) { qp.Eviction.EvictionMetric = "unknown" }, []string{"cpu", "heap"}, `unrecognized eviction_metric "unknown"; supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes`}, + "cpu without monitored fails": {func(qp *QueryProtection) {}, []string{"heap"}, `monitored_resources config must include "cpu" when eviction cpu threshold is set`}, + "heap without monitored fails": {func(qp *QueryProtection) {}, []string{"cpu"}, `monitored_resources config must include "heap" when eviction heap threshold is set`}, + "cooldown 0 is valid": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = 0 }, []string{"cpu", "heap"}, ""}, + "disabled skips interval check": {func(qp *QueryProtection) { + qp.Eviction.Threshold = Threshold{} + qp.Eviction.CheckInterval = 0 + }, []string{}, ""}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + qp := validBase() + tc.modify(&qp) + err := qp.Validate(flagext.StringSliceCSV(tc.monitoredResources)) + if tc.err != "" { + require.EqualError(t, err, tc.err) + } else { + require.NoError(t, err) + } + }) + } +} + +func Test_RegisterFlagsWithPrefix_EvictionDefaults(t *testing.T) { + var cfg QueryProtection + fs := flag.NewFlagSet("test", flag.ContinueOnError) + cfg.RegisterFlagsWithPrefix(fs, "querier.") + require.NoError(t, fs.Parse([]string{})) + + assert.Equal(t, float64(0), cfg.Eviction.Threshold.CPUUtilization) + assert.Equal(t, float64(0), cfg.Eviction.Threshold.HeapUtilization) + assert.Equal(t, 1*time.Second, cfg.Eviction.CheckInterval) + assert.Equal(t, 3, cfg.Eviction.CooldownPeriod) + assert.Equal(t, "fetched_samples", cfg.Eviction.EvictionMetric) + assert.False(t, cfg.Eviction.Enabled()) +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index f24d2db000..fd6d9c7389 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -284,7 +284,8 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) { querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer) // Create a querier queryable and PromQL engine - t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.OverridesConfig.QueryPartialData, t.ResourceMonitor) + var evictorService services.Service + t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine, evictorService = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.OverridesConfig.QueryPartialData, t.ResourceMonitor) // Use distributor as default MetadataQuerier t.MetadataQuerier = t.Distributor @@ -292,7 +293,7 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) { // Register the default endpoints that are always enabled for the querier module t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor) - return nil, nil + return evictorService, nil } // Enable merge querier if multi tenant query federation is enabled @@ -701,7 +702,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { queryEngine = engine.New(opts, t.Cfg.Ruler.ThanosEngine, rulerRegisterer) } else { // TODO: Consider wrapping logger to differentiate from querier module logger - queryable, _, queryEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.OverridesConfig.RulesPartialData, nil) + queryable, _, queryEngine, _ = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.OverridesConfig.RulesPartialData, nil) } managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, pusher, queryable, queryEngine, t.OverridesConfig, metrics, prometheus.DefaultRegisterer) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 8c701283bc..064a8ca309 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -33,7 +33,9 @@ import ( "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/parquetutil" + "github.com/cortexproject/cortex/pkg/util/queryeviction" "github.com/cortexproject/cortex/pkg/util/resource" + "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -233,7 +235,7 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc { } // New builds a queryable and promql engine. -func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc, resourceMonitor resource.IMonitor) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) { +func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc, resourceMonitor resource.IMonitor) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine, services.Service) { iteratorFunc := getChunksIteratorFunction(cfg) // Create resource-based limiter if resource monitor is available and thresholds are configured. @@ -255,6 +257,36 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor } } + // Set up query eviction if configured. + var queryRegistry *queryeviction.QueryRegistry + var queryEvictor *queryeviction.QueryEvictor + + evictionCfg := cfg.QueryProtection.Eviction + if evictionCfg.Enabled() && resourceMonitor != nil { + evCfg := queryeviction.EvictionConfig{ + CPUUtilization: evictionCfg.Threshold.CPUUtilization, + HeapUtilization: evictionCfg.Threshold.HeapUtilization, + CheckInterval: evictionCfg.CheckInterval, + CooldownPeriod: evictionCfg.CooldownPeriod, + EvictionMetric: evictionCfg.EvictionMetric, + MinQueryAge: evictionCfg.MinQueryAge, + } + + metricFunc, err := queryeviction.ResolveMetricFunc(evCfg.EvictionMetric) + if err != nil { + level.Error(logger).Log("msg", "invalid eviction metric", "err", err) + } else { + queryRegistry = queryeviction.NewQueryRegistry(metricFunc) + queryEvictor, err = queryeviction.NewQueryEvictor( + resourceMonitor, queryRegistry, evCfg, + logger, reg, "querier", + ) + if err != nil { + level.Error(logger).Log("msg", "failed to create query evictor", "err", err) + } + } + } + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits, nil) ns := make([]QueryableWithFilter, len(stores)) @@ -298,7 +330,19 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor }, } queryEngine := engine.New(opts, cfg.ThanosEngine, reg) - return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, queryEngine + + // Wrap the engine with eviction support if the registry was created. + var eng engine.QueryEngine = queryEngine + if queryRegistry != nil { + eng = queryeviction.NewResourceEvictingEngine(queryEngine, queryRegistry) + } + + // Return the evictor as a service so the caller can manage its lifecycle. + var evictorService services.Service + if queryEvictor != nil { + evictorService = queryEvictor + } + return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, eng, evictorService } // NewSampleAndChunkQueryable creates a SampleAndChunkQueryable from a diff --git a/pkg/querier/querier_eviction_test.go b/pkg/querier/querier_eviction_test.go new file mode 100644 index 0000000000..8e643ba8fd --- /dev/null +++ b/pkg/querier/querier_eviction_test.go @@ -0,0 +1,119 @@ +package querier + +import ( + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/cortexproject/cortex/pkg/configs" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/queryeviction" + "github.com/cortexproject/cortex/pkg/util/resource" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +// simpleMonitor implements resource.IMonitor for testing. +type simpleMonitor struct{} + +func (m *simpleMonitor) GetCPUUtilization() float64 { return 0.5 } +func (m *simpleMonitor) GetHeapUtilization() float64 { return 0.5 } + +// Compile-time check that simpleMonitor implements resource.IMonitor. +var _ resource.IMonitor = (*simpleMonitor)(nil) + +func TestQuerier_EvictionIntegration(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + evictionCPU float64 + evictionHeap float64 + resourceMonitor resource.IMonitor + expectWrapped bool + expectService bool + }{ + "engine wrapped when eviction enabled and resourceMonitor provided": { + evictionCPU: 0.85, + evictionHeap: 0.85, + resourceMonitor: &simpleMonitor{}, + expectWrapped: true, + expectService: true, + }, + "engine not wrapped when eviction disabled (both thresholds 0)": { + evictionCPU: 0, + evictionHeap: 0, + resourceMonitor: &simpleMonitor{}, + expectWrapped: false, + expectService: false, + }, + "engine not wrapped when resourceMonitor is nil": { + evictionCPU: 0.85, + evictionHeap: 0.85, + resourceMonitor: nil, + expectWrapped: false, + expectService: false, + }, + "engine wrapped with CPU-only threshold": { + evictionCPU: 0.9, + evictionHeap: 0, + resourceMonitor: &simpleMonitor{}, + expectWrapped: true, + expectService: true, + }, + "engine wrapped with heap-only threshold": { + evictionCPU: 0, + evictionHeap: 0.9, + resourceMonitor: &simpleMonitor{}, + expectWrapped: true, + expectService: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + var cfg Config + flagext.DefaultValues(&cfg) + // Disable active query tracker to avoid mmap error in tests. + cfg.ActiveQueryTrackerDir = "" + + cfg.QueryProtection = configs.QueryProtection{ + Eviction: configs.EvictionConfig{ + Threshold: configs.Threshold{ + CPUUtilization: tc.evictionCPU, + HeapUtilization: tc.evictionHeap, + }, + CheckInterval: 1 * time.Second, + CooldownPeriod: 3, + EvictionMetric: "fetched_samples", + }, + } + + overrides := validation.NewOverrides(DefaultLimitsConfig(), nil) + + distributor := &MockDistributor{} + distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&client.QueryStreamResponse{}, nil) + + queryables := []QueryableWithFilter{} + + _, _, eng, svc := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, tc.resourceMonitor) + + if tc.expectWrapped { + _, ok := eng.(*queryeviction.ResourceEvictingEngine) + assert.True(t, ok, "expected engine to be *queryeviction.ResourceEvictingEngine") + } else { + _, ok := eng.(*queryeviction.ResourceEvictingEngine) + assert.False(t, ok, "expected engine NOT to be *queryeviction.ResourceEvictingEngine") + } + + if tc.expectService { + assert.NotNil(t, svc, "expected evictor service to be non-nil") + } else { + assert.Nil(t, svc, "expected evictor service to be nil") + } + }) + } +} diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index a072abc222..f062eeaac2 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -587,7 +587,7 @@ func TestQuerier(t *testing.T) { overrides := validation.NewOverrides(DefaultLimitsConfig(), nil) queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) testRangeQuery(t, queryable, queryEngine, through, query, enc) }) } @@ -691,7 +691,7 @@ func TestNoHistoricalQueryToIngester(t *testing.T) { overrides := validation.NewOverrides(limits, nil) ctx := user.InjectOrgID(context.Background(), "0") - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) require.NoError(t, err) @@ -784,7 +784,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "0") queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.queryStartTime, c.queryEndTime, time.Minute) require.NoError(t, err) @@ -876,7 +876,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) queryEngine := promql.NewEngine(opts) ctx := user.InjectOrgID(context.Background(), "test") @@ -914,7 +914,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Series(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "test") now := time.Now() @@ -972,7 +972,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Labels(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "test") @@ -1122,7 +1122,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor := &MockDistributor{} distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, testData.query, testData.queryStartTime, testData.queryEndTime, time.Minute) require.NoError(t, err) @@ -1150,7 +1150,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1191,7 +1191,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("LabelNames", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) distributor.On("LabelNamesStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1219,7 +1219,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, matchers).Return([]labels.Labels{}, nil) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, matchers).Return([]labels.Labels{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1246,7 +1246,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) distributor.On("LabelValuesForLabelNameStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1583,7 +1583,7 @@ func TestShortTermQueryToLTS(t *testing.T) { limits.QueryStoreAfter = model.Duration(c.queryStoreAfter) overrides := validation.NewOverrides(limits, nil) - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "0") query, err := engine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) require.NoError(t, err) diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 66c43ac46c..9c791edeae 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -242,7 +242,7 @@ func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Reg querierTestConfig.Cfg.ActiveQueryTrackerDir = "" overrides := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) - q, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, reg, logger, nil, nil) + q, _, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, reg, logger, nil, nil) return func(mint, maxt int64) (storage.Querier, error) { return q.Querier(mint, maxt) } diff --git a/pkg/util/queryeviction/engine_wrapper.go b/pkg/util/queryeviction/engine_wrapper.go new file mode 100644 index 0000000000..d6342b2732 --- /dev/null +++ b/pkg/util/queryeviction/engine_wrapper.go @@ -0,0 +1,157 @@ +package queryeviction + +import ( + "context" + "time" + + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/stats" + "github.com/thanos-io/promql-engine/logicalplan" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/engine" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/util/requestmeta" +) + +// Compile-time check that ResourceEvictingEngine implements engine.QueryEngine. +var _ engine.QueryEngine = (*ResourceEvictingEngine)(nil) + +// ResourceEvictingEngine wraps a QueryEngine to register running queries +// with a QueryRegistry, enabling resource-based eviction. +type ResourceEvictingEngine struct { + inner engine.QueryEngine + registry *QueryRegistry +} + +// NewResourceEvictingEngine wraps the given engine. +// If registry is nil, the wrapper is a no-op passthrough. +func NewResourceEvictingEngine(inner engine.QueryEngine, registry *QueryRegistry) *ResourceEvictingEngine { + return &ResourceEvictingEngine{ + inner: inner, + registry: registry, + } +} + +func (e *ResourceEvictingEngine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { + query, err := e.inner.NewInstantQuery(ctx, q, opts, qs, ts) + if err != nil { + return nil, err + } + if e.registry == nil { + return query, nil + } + return e.wrapQuery(ctx, query, qs), nil +} + +func (e *ResourceEvictingEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { + query, err := e.inner.NewRangeQuery(ctx, q, opts, qs, start, end, interval) + if err != nil { + return nil, err + } + if e.registry == nil { + return query, nil + } + return e.wrapQuery(ctx, query, qs), nil +} + +func (e *ResourceEvictingEngine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, ts time.Time, qs string) (promql.Query, error) { + query, err := e.inner.MakeInstantQueryFromPlan(ctx, q, opts, root, ts, qs) + if err != nil { + return nil, err + } + if e.registry == nil { + return query, nil + } + return e.wrapQuery(ctx, query, qs), nil +} + +func (e *ResourceEvictingEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start time.Time, end time.Time, interval time.Duration, qs string) (promql.Query, error) { + query, err := e.inner.MakeRangeQueryFromPlan(ctx, q, opts, root, start, end, interval, qs) + if err != nil { + return nil, err + } + if e.registry == nil { + return query, nil + } + return e.wrapQuery(ctx, query, qs), nil +} + +// wrapQuery creates a trackedQuery that registers/deregisters with the registry. +// It creates a cancellable child context so the evictor can cancel individual queries. +func (e *ResourceEvictingEngine) wrapQuery(ctx context.Context, inner promql.Query, queryExpr string) *trackedQuery { + childCtx, cancel := context.WithCancel(ctx) + + //lint:ignore faillint wrapper around upstream method + userID, _ := user.ExtractOrgID(ctx) + + return &trackedQuery{ + inner: inner, + registry: e.registry, + queryExpr: queryExpr, + userID: userID, + requestID: requestmeta.RequestIdFromContext(ctx), + cancel: cancel, + ctx: childCtx, + } +} + +// trackedQuery wraps a promql.Query to register/deregister with the registry. +type trackedQuery struct { + inner promql.Query + registry *QueryRegistry + queryID uint64 + queryExpr string + userID string + requestID string + cancel context.CancelFunc + ctx context.Context // cancellable child context +} + +// Exec registers the query, executes it with a cancellable context, +// then deregisters on completion. If the query was evicted (child context +// cancelled by evictor but parent context not cancelled), wraps the error +// as ErrQueryEvicted. +func (q *trackedQuery) Exec(ctx context.Context) *promql.Result { + queryStats := querier_stats.FromContext(q.ctx) + q.queryID = q.registry.Register(q.cancel, queryStats, q.queryExpr, q.userID, q.requestID) + defer q.registry.Deregister(q.queryID) + + result := q.inner.Exec(q.ctx) + + // Detect eviction: child context cancelled but parent context is still active. + if result.Err != nil && q.ctx.Err() != nil && ctx.Err() == nil { + return &promql.Result{ + Err: promql.ErrStorage{Err: &ErrQueryEvicted{}}, + } + } + + return result +} + +// Statement delegates to the inner query. +func (q *trackedQuery) Statement() parser.Statement { + return q.inner.Statement() +} + +// Stats delegates to the inner query. +func (q *trackedQuery) Stats() *stats.Statistics { + return q.inner.Stats() +} + +// Close delegates to the inner query. +func (q *trackedQuery) Close() { + q.inner.Close() +} + +// Cancel cancels the tracked query's child context. +func (q *trackedQuery) Cancel() { + q.cancel() +} + +// String delegates to the inner query. +func (q *trackedQuery) String() string { + return q.inner.String() +} diff --git a/pkg/util/queryeviction/engine_wrapper_test.go b/pkg/util/queryeviction/engine_wrapper_test.go new file mode 100644 index 0000000000..884cf1349b --- /dev/null +++ b/pkg/util/queryeviction/engine_wrapper_test.go @@ -0,0 +1,247 @@ +package queryeviction + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/stats" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/logicalplan" + + "github.com/cortexproject/cortex/pkg/engine" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" +) + +// Compile-time check that mockEngine implements engine.QueryEngine. +var _ engine.QueryEngine = (*mockEngine)(nil) + +// mockEngine is a minimal implementation of engine.QueryEngine for testing. +type mockEngine struct { + query promql.Query // the query to return from all methods + err error // optional error to return +} + +func (m *mockEngine) NewInstantQuery(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, _ string, _ time.Time) (promql.Query, error) { + return m.query, m.err +} + +func (m *mockEngine) NewRangeQuery(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, _ string, _, _ time.Time, _ time.Duration) (promql.Query, error) { + return m.query, m.err +} + +func (m *mockEngine) MakeInstantQueryFromPlan(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, _ logicalplan.Node, _ time.Time, _ string) (promql.Query, error) { + return m.query, m.err +} + +func (m *mockEngine) MakeRangeQueryFromPlan(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, _ logicalplan.Node, _, _ time.Time, _ time.Duration, _ string) (promql.Query, error) { + return m.query, m.err +} + +// mockQuery is a minimal implementation of promql.Query for testing. +type mockQuery struct { + execResult *promql.Result + execFunc func(ctx context.Context) *promql.Result // optional custom exec + closed bool +} + +func (q *mockQuery) Exec(ctx context.Context) *promql.Result { + if q.execFunc != nil { + return q.execFunc(ctx) + } + return q.execResult +} + +func (q *mockQuery) Close() { + q.closed = true +} + +func (q *mockQuery) Statement() parser.Statement { + return nil +} + +func (q *mockQuery) Stats() *stats.Statistics { + return nil +} + +func (q *mockQuery) Cancel() {} + +func (q *mockQuery) String() string { + return "mock_query" +} + +// ctxWithStats returns a context that has QueryStats initialized. +func ctxWithStats(ctx context.Context) context.Context { + _, ctx = querier_stats.ContextWithEmptyStats(ctx) + return ctx +} + +func TestEngineWrapper_RegisterAndDeregisterDuringExec(t *testing.T) { + registry := NewQueryRegistry(testMetricFunc) + var registeredLen int + + mq := &mockQuery{ + execFunc: func(ctx context.Context) *promql.Result { + // During exec, the query should be registered. + registeredLen = registry.Len() + return &promql.Result{} + }, + } + + inner := &mockEngine{query: mq} + wrapper := NewResourceEvictingEngine(inner, registry) + + ctx := ctxWithStats(context.Background()) + query, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", time.Now()) + require.NoError(t, err) + + // Before exec, registry should be empty. + assert.Equal(t, 0, registry.Len()) + + _ = query.Exec(ctx) + + // During exec, the query was registered. + assert.Equal(t, 1, registeredLen, "query should be registered during Exec") + + // After exec, the query should be deregistered. + assert.Equal(t, 0, registry.Len(), "query should be deregistered after Exec") +} + +func TestEngineWrapper_EvictedQueryReturnsErrQueryEvicted(t *testing.T) { + registry := NewQueryRegistry(testMetricFunc) + + mq := &mockQuery{ + execFunc: func(ctx context.Context) *promql.Result { + // Simulate eviction: find the registered query and cancel it. + heaviest := registry.FindHeaviest(0) + require.NotNil(t, heaviest, "query should be registered during Exec") + heaviest.Cancel() // This cancels the child context, simulating evictor behavior. + + // The inner query would see a cancelled context and return an error. + return &promql.Result{Err: context.Canceled} + }, + } + + inner := &mockEngine{query: mq} + wrapper := NewResourceEvictingEngine(inner, registry) + + ctx := ctxWithStats(context.Background()) + query, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", time.Now()) + require.NoError(t, err) + + result := query.Exec(ctx) + + // The result should contain ErrQueryEvicted wrapped in ErrStorage for 500 status. + require.NotNil(t, result.Err) + var storageErr promql.ErrStorage + require.ErrorAs(t, result.Err, &storageErr, "error should be promql.ErrStorage, got: %v", result.Err) + var evictedErr *ErrQueryEvicted + assert.True(t, errors.As(storageErr.Err, &evictedErr), "inner error should be ErrQueryEvicted, got: %v", storageErr.Err) +} + +func TestEngineWrapper_NonEvictedQueryReturnsNormalResult(t *testing.T) { + registry := NewQueryRegistry(testMetricFunc) + + expectedResult := &promql.Result{ + Value: promql.Scalar{T: 1000, V: 42.0}, + } + + mq := &mockQuery{execResult: expectedResult} + inner := &mockEngine{query: mq} + wrapper := NewResourceEvictingEngine(inner, registry) + + ctx := ctxWithStats(context.Background()) + query, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", time.Now()) + require.NoError(t, err) + + result := query.Exec(ctx) + + // The result should be passed through unchanged. + assert.NoError(t, result.Err) + assert.Equal(t, expectedResult.Value, result.Value, "result value should be passed through unchanged") +} + +func TestEngineWrapper_NilRegistryIsNoOpPassthrough(t *testing.T) { + mq := &mockQuery{ + execResult: &promql.Result{Value: promql.Scalar{T: 1000, V: 42.0}}, + } + inner := &mockEngine{query: mq} + + // Create wrapper with nil registry — should be a no-op passthrough. + wrapper := NewResourceEvictingEngine(inner, nil) + + ctx := context.Background() + + // Test all four engine methods return the inner query directly (not wrapped). + instantQuery, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", time.Now()) + require.NoError(t, err) + assert.Equal(t, mq, instantQuery, "nil registry should return inner query directly for NewInstantQuery") + + rangeQuery, err := wrapper.NewRangeQuery(ctx, nil, nil, "up", time.Now(), time.Now(), time.Minute) + require.NoError(t, err) + assert.Equal(t, mq, rangeQuery, "nil registry should return inner query directly for NewRangeQuery") + + instantPlanQuery, err := wrapper.MakeInstantQueryFromPlan(ctx, nil, nil, nil, time.Now(), "up") + require.NoError(t, err) + assert.Equal(t, mq, instantPlanQuery, "nil registry should return inner query directly for MakeInstantQueryFromPlan") + + rangePlanQuery, err := wrapper.MakeRangeQueryFromPlan(ctx, nil, nil, nil, time.Now(), time.Now(), time.Minute, "up") + require.NoError(t, err) + assert.Equal(t, mq, rangePlanQuery, "nil registry should return inner query directly for MakeRangeQueryFromPlan") +} + +func TestEngineWrapper_CloseCallsInnerClose(t *testing.T) { + registry := NewQueryRegistry(testMetricFunc) + + mq := &mockQuery{ + execResult: &promql.Result{}, + } + inner := &mockEngine{query: mq} + wrapper := NewResourceEvictingEngine(inner, registry) + + ctx := ctxWithStats(context.Background()) + query, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", time.Now()) + require.NoError(t, err) + + // Close the tracked query. + query.Close() + + // Verify the inner query's Close was called. + assert.True(t, mq.closed, "Close on tracked query should delegate to inner query's Close") +} + +func TestEngineWrapper_AllMethodsWrapQuery(t *testing.T) { + registry := NewQueryRegistry(testMetricFunc) + mq := &mockQuery{execResult: &promql.Result{}} + inner := &mockEngine{query: mq} + wrapper := NewResourceEvictingEngine(inner, registry) + + ctx := ctxWithStats(context.Background()) + now := time.Now() + + // All four methods should return a trackedQuery (not the raw mockQuery). + q1, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", now) + require.NoError(t, err) + _, ok := q1.(*trackedQuery) + assert.True(t, ok, "NewInstantQuery should return a trackedQuery") + + q2, err := wrapper.NewRangeQuery(ctx, nil, nil, "up", now, now, time.Minute) + require.NoError(t, err) + _, ok = q2.(*trackedQuery) + assert.True(t, ok, "NewRangeQuery should return a trackedQuery") + + q3, err := wrapper.MakeInstantQueryFromPlan(ctx, nil, nil, nil, now, "up") + require.NoError(t, err) + _, ok = q3.(*trackedQuery) + assert.True(t, ok, "MakeInstantQueryFromPlan should return a trackedQuery") + + q4, err := wrapper.MakeRangeQueryFromPlan(ctx, nil, nil, nil, now, now, time.Minute, "up") + require.NoError(t, err) + _, ok = q4.(*trackedQuery) + assert.True(t, ok, "MakeRangeQueryFromPlan should return a trackedQuery") +} diff --git a/pkg/util/queryeviction/evictor.go b/pkg/util/queryeviction/evictor.go new file mode 100644 index 0000000000..08b835f482 --- /dev/null +++ b/pkg/util/queryeviction/evictor.go @@ -0,0 +1,150 @@ +package queryeviction + +import ( + "context" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/cortexproject/cortex/pkg/util/resource" + "github.com/cortexproject/cortex/pkg/util/services" +) + +// EvictionConfig configures the resource-based query evictor. +type EvictionConfig struct { + CPUUtilization float64 `yaml:"cpu_utilization"` + HeapUtilization float64 `yaml:"heap_utilization"` + CheckInterval time.Duration `yaml:"check_interval"` + CooldownPeriod int `yaml:"cooldown_period"` + EvictionMetric string `yaml:"eviction_metric"` + MinQueryAge time.Duration `yaml:"min_query_age"` +} + +// Enabled returns true if at least one threshold is > 0. +func (c EvictionConfig) Enabled() bool { + return c.CPUUtilization > 0 || c.HeapUtilization > 0 +} + +// QueryEvictor monitors system-wide resource utilization and evicts +// the heaviest running query when thresholds are breached. +type QueryEvictor struct { + services.Service + + monitor resource.IMonitor + registry *QueryRegistry + cfg EvictionConfig + logger log.Logger + + // Prometheus metrics + evictionsTotal *prometheus.CounterVec // labels: resource, component +} + +// NewQueryEvictor creates a new evictor. Returns nil if config is disabled. +func NewQueryEvictor( + monitor resource.IMonitor, + registry *QueryRegistry, + cfg EvictionConfig, + logger log.Logger, + reg prometheus.Registerer, + component string, +) (*QueryEvictor, error) { + if !cfg.Enabled() { + return nil, nil + } + + e := &QueryEvictor{ + monitor: monitor, + registry: registry, + cfg: cfg, + logger: logger, + evictionsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_evictions_total", + Help: "Total number of queries evicted due to resource pressure.", + ConstLabels: map[string]string{"component": component}, + }, []string{"resource"}), + } + + e.Service = services.NewBasicService(nil, e.running, nil) + return e, nil +} + +// running is the main loop (called by services.Service). +func (e *QueryEvictor) running(ctx context.Context) error { + ticker := time.NewTicker(e.cfg.CheckInterval) + defer ticker.Stop() + + cooldownRemaining := 0 + + for { + select { + case <-ctx.Done(): + return nil + + case <-ticker.C: + // If in cooldown, decrement and skip this tick. + if cooldownRemaining > 0 { + cooldownRemaining-- + continue + } + + // Check system-wide resource utilization. + breachedResource, utilization, threshold := e.checkThresholds() + if breachedResource == "" { + continue // no breach + } + + // Find the heaviest running query. + heaviest := e.registry.FindHeaviest(e.cfg.MinQueryAge) + if heaviest == nil { + continue // no running queries to evict + } + + // Evict the heaviest query. + metricValue := e.registry.metric(heaviest.Stats) + heaviest.Cancel() + + // Log the eviction. + level.Warn(e.logger).Log( + "msg", "evicting heaviest query due to resource pressure", + "resource", breachedResource, + "utilization", utilization, + "threshold", threshold, + "request_id", heaviest.RequestID, + "query", heaviest.QueryExpr, + "user", heaviest.UserID, + "metric", e.cfg.EvictionMetric, + "metric_value", metricValue, + ) + + // Increment metrics. + e.evictionsTotal.WithLabelValues(string(breachedResource)).Inc() + + // Enter cooldown. + cooldownRemaining = e.cfg.CooldownPeriod + } + } +} + +// checkThresholds returns the first breached resource type, its current +// utilization, and the configured threshold. Returns ("", 0, 0) if no breach. +// CPU is checked before heap (deterministic priority). +func (e *QueryEvictor) checkThresholds() (resource.Type, float64, float64) { + if e.cfg.CPUUtilization > 0 { + cpuUtil := e.monitor.GetCPUUtilization() + if cpuUtil >= e.cfg.CPUUtilization { + return resource.CPU, cpuUtil, e.cfg.CPUUtilization + } + } + + if e.cfg.HeapUtilization > 0 { + heapUtil := e.monitor.GetHeapUtilization() + if heapUtil >= e.cfg.HeapUtilization { + return resource.Heap, heapUtil, e.cfg.HeapUtilization + } + } + + return "", 0, 0 +} diff --git a/pkg/util/queryeviction/evictor_test.go b/pkg/util/queryeviction/evictor_test.go new file mode 100644 index 0000000000..4a104ee06a --- /dev/null +++ b/pkg/util/queryeviction/evictor_test.go @@ -0,0 +1,233 @@ +package queryeviction + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/util/resource" + "github.com/cortexproject/cortex/pkg/util/services" +) + +type mockMonitor struct { + cpuUtil atomic.Float64 + heapUtil atomic.Float64 +} + +func newMockMonitor(cpu, heap float64) *mockMonitor { + m := &mockMonitor{} + m.cpuUtil.Store(cpu) + m.heapUtil.Store(heap) + return m +} + +func (m *mockMonitor) GetCPUUtilization() float64 { return m.cpuUtil.Load() } +func (m *mockMonitor) GetHeapUtilization() float64 { return m.heapUtil.Load() } + +func testEvictorConfig(cpu, heap float64, cooldown int) EvictionConfig { + return EvictionConfig{ + CPUUtilization: cpu, + HeapUtilization: heap, + CheckInterval: 10 * time.Millisecond, + CooldownPeriod: cooldown, + EvictionMetric: "fetched_samples", + } +} + +// startEvictor creates and starts an evictor, returning it and a cleanup function. +func startEvictor(t *testing.T, mon *mockMonitor, reg *QueryRegistry, cfg EvictionConfig) *QueryEvictor { + t.Helper() + evictor, err := NewQueryEvictor(mon, reg, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), "test") + require.NoError(t, err) + require.NotNil(t, evictor) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), evictor)) + t.Cleanup(func() { services.StopAndAwaitTerminated(context.Background(), evictor) }) //nolint:errcheck + return evictor +} + +// registerTestQuery registers a query and returns a channel closed on eviction. +func registerTestQuery(reg *QueryRegistry, fetchedSamples uint64, expr, user string) (uint64, chan struct{}) { + ctx, cancel := context.WithCancel(context.Background()) + stats := &querier_stats.QueryStats{} + stats.AddFetchedSamples(fetchedSamples) + + evicted := make(chan struct{}) + id := reg.Register(func() { cancel(); close(evicted) }, stats, expr, user, "") + _ = ctx + return id, evicted +} + +func waitEvicted(t *testing.T, ch chan struct{}) { + t.Helper() + select { + case <-ch: + case <-time.After(500 * time.Millisecond): + t.Fatal("expected query to be evicted") + } +} + +func assertNotEvicted(t *testing.T, ch chan struct{}, wait time.Duration) { + t.Helper() + select { + case <-ch: + t.Fatal("query should not have been evicted") + case <-time.After(wait): + } +} + +func TestEviction_OccursWhenAboveThreshold(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evicted := registerTestQuery(reg, 1000, "up", "user1") + startEvictor(t, newMockMonitor(0.95, 0.0), reg, testEvictorConfig(0.9, 0, 0)) + waitEvicted(t, evicted) +} + +func TestNoEviction_WhenBelowThreshold(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evicted := registerTestQuery(reg, 1000, "up", "user1") + startEvictor(t, newMockMonitor(0.5, 0.5), reg, testEvictorConfig(0.9, 0.9, 0)) + assertNotEvicted(t, evicted, 100*time.Millisecond) + assert.Equal(t, 1, reg.Len()) +} + +func TestCooldown_BlocksEvictionThenResumes(t *testing.T) { + mon := newMockMonitor(0.95, 0.0) + reg := NewQueryRegistry(testMetricFunc) + _, evicted1 := registerTestQuery(reg, 1000, "q1", "user1") + + // cooldown=3 ticks × 10ms = 30ms + startEvictor(t, mon, reg, testEvictorConfig(0.9, 0, 3)) + waitEvicted(t, evicted1) + + // Second query registered during cooldown should not be evicted immediately. + _, evicted2 := registerTestQuery(reg, 2000, "q2", "user2") + assertNotEvicted(t, evicted2, 20*time.Millisecond) + + // But should be evicted after cooldown expires. + waitEvicted(t, evicted2) +} + +func TestCPUOnlyThreshold(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evicted := registerTestQuery(reg, 1000, "up", "user1") + startEvictor(t, newMockMonitor(0.95, 0.95), reg, testEvictorConfig(0.9, 0, 0)) + waitEvicted(t, evicted) +} + +func TestHeapOnlyThreshold(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evicted := registerTestQuery(reg, 1000, "up", "user1") + startEvictor(t, newMockMonitor(0.0, 0.95), reg, testEvictorConfig(0, 0.9, 0)) + waitEvicted(t, evicted) +} + +func TestCPUCheckedBeforeHeap(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evicted := registerTestQuery(reg, 1000, "up", "user1") + evictor := startEvictor(t, newMockMonitor(0.95, 0.95), reg, testEvictorConfig(0.9, 0.9, 0)) + waitEvicted(t, evicted) + + assert.Equal(t, float64(1), promtest.ToFloat64(evictor.evictionsTotal.WithLabelValues(string(resource.CPU)))) + assert.Equal(t, float64(0), promtest.ToFloat64(evictor.evictionsTotal.WithLabelValues(string(resource.Heap)))) +} + +func TestEmptyRegistry_NoPanic(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + evictor := startEvictor(t, newMockMonitor(0.95, 0.95), reg, testEvictorConfig(0.9, 0.9, 0)) + + time.Sleep(50 * time.Millisecond) + + assert.Equal(t, float64(0), promtest.ToFloat64(evictor.evictionsTotal.WithLabelValues(string(resource.CPU)))) + assert.Equal(t, float64(0), promtest.ToFloat64(evictor.evictionsTotal.WithLabelValues(string(resource.Heap)))) +} + +func TestCheckThresholds(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + + tests := map[string]struct { + cpuUtil, heapUtil float64 + cpuThresh, heapThresh float64 + wantResource resource.Type + wantUtil, wantThreshold float64 + }{ + "CPU breached": {0.95, 0.5, 0.9, 0.9, resource.CPU, 0.95, 0.9}, + "Heap breached": {0.5, 0.95, 0.9, 0.9, resource.Heap, 0.95, 0.9}, + "Both breached, CPU first": {0.92, 0.93, 0.9, 0.9, resource.CPU, 0.92, 0.9}, + "Neither breached": {0.5, 0.5, 0.9, 0.9, "", 0, 0}, + "CPU disabled, heap breached": {0.95, 0.95, 0, 0.9, resource.Heap, 0.95, 0.9}, + "Heap disabled, CPU breached": {0.95, 0.95, 0.9, 0, resource.CPU, 0.95, 0.9}, + "Exact threshold triggers": {0.9, 0.5, 0.9, 0.9, resource.CPU, 0.9, 0.9}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + mon := newMockMonitor(tc.cpuUtil, tc.heapUtil) + cfg := EvictionConfig{ + CPUUtilization: tc.cpuThresh, + HeapUtilization: tc.heapThresh, + CheckInterval: time.Second, + EvictionMetric: "fetched_samples", + } + + var evictor *QueryEvictor + if cfg.Enabled() { + var err error + evictor, err = NewQueryEvictor(mon, reg, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), "test") + require.NoError(t, err) + } else { + evictor = &QueryEvictor{monitor: mon, cfg: cfg} + } + + resType, util, thresh := evictor.checkThresholds() + assert.Equal(t, tc.wantResource, resType) + assert.Equal(t, tc.wantUtil, util) + assert.Equal(t, tc.wantThreshold, thresh) + }) + } +} + +func TestPrometheusMetrics_IncrementedCorrectly(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + evictor := startEvictor(t, newMockMonitor(0.95, 0.0), reg, testEvictorConfig(0.9, 0, 0)) + + for i := range 3 { + _, evicted := registerTestQuery(reg, uint64(1000+i), "q", "user") + waitEvicted(t, evicted) + } + + assert.Equal(t, float64(3), promtest.ToFloat64(evictor.evictionsTotal.WithLabelValues(string(resource.CPU)))) +} + +func TestNewQueryEvictor_ReturnsNilWhenDisabled(t *testing.T) { + cfg := EvictionConfig{CheckInterval: time.Second, EvictionMetric: "fetched_samples"} + evictor, err := NewQueryEvictor(newMockMonitor(0, 0), NewQueryRegistry(testMetricFunc), cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), "test") + assert.NoError(t, err) + assert.Nil(t, evictor) +} + +func TestEviction_HeaviestQueryIsEvicted(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evictedSmall := registerTestQuery(reg, 100, "small", "user1") + _, evictedLarge := registerTestQuery(reg, 10000, "large", "user2") + _, evictedMedium := registerTestQuery(reg, 500, "medium", "user3") + + startEvictor(t, newMockMonitor(0.95, 0.0), reg, testEvictorConfig(0.9, 0, 0)) + + select { + case <-evictedLarge: + case <-evictedSmall: + t.Fatal("small query evicted before heaviest") + case <-evictedMedium: + t.Fatal("medium query evicted before heaviest") + case <-time.After(500 * time.Millisecond): + t.Fatal("expected heaviest query to be evicted") + } +} diff --git a/pkg/util/queryeviction/registry.go b/pkg/util/queryeviction/registry.go new file mode 100644 index 0000000000..89dd6cc0b6 --- /dev/null +++ b/pkg/util/queryeviction/registry.go @@ -0,0 +1,139 @@ +package queryeviction + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/gogo/status" + "google.golang.org/grpc/codes" + + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" +) + +// ErrQueryEvicted is returned when a query is cancelled by the evictor. +type ErrQueryEvicted struct{} + +func (e *ErrQueryEvicted) Error() string { + return status.Error(codes.ResourceExhausted, "resource limit reached").Error() +} + +// QueryEntry represents a single running query in the registry. +type QueryEntry struct { + QueryID uint64 + Cancel context.CancelFunc + Stats *querier_stats.QueryStats + QueryExpr string // PromQL expression for logging + UserID string // tenant ID for logging/metrics + RequestID string // request ID for correlation + RegisteredAt time.Time +} + +// MetricFunc extracts a comparable weight value from QueryStats. +// Higher values mean "heavier" query. +type MetricFunc func(s *querier_stats.QueryStats) uint64 + +// QueryRegistry tracks all currently running queries. +type QueryRegistry struct { + mu sync.RWMutex + queries map[uint64]*QueryEntry + nextID uint64 + metric MetricFunc // configurable: default is LoadPeakSamples +} + +// NewQueryRegistry creates a registry with the given metric function. +func NewQueryRegistry(metric MetricFunc) *QueryRegistry { + return &QueryRegistry{ + queries: make(map[uint64]*QueryEntry), + metric: metric, + } +} + +// Register adds a running query and returns its unique, monotonically increasing ID. +func (r *QueryRegistry) Register(cancel context.CancelFunc, stats *querier_stats.QueryStats, queryExpr string, userID string, requestID string) uint64 { + r.mu.Lock() + defer r.mu.Unlock() + + r.nextID++ + id := r.nextID + + r.queries[id] = &QueryEntry{ + QueryID: id, + Cancel: cancel, + Stats: stats, + QueryExpr: queryExpr, + UserID: userID, + RequestID: requestID, + RegisteredAt: time.Now(), + } + + return id +} + +// Deregister removes a query from the registry. +// It is a no-op if the ID is not found. +func (r *QueryRegistry) Deregister(id uint64) { + r.mu.Lock() + defer r.mu.Unlock() + + delete(r.queries, id) +} + +// FindHeaviest returns the entry with the highest metric value +// among queries that have been running for at least minAge, +// or nil if no eligible queries exist. +func (r *QueryRegistry) FindHeaviest(minAge time.Duration) *QueryEntry { + r.mu.RLock() + defer r.mu.RUnlock() + + var heaviest *QueryEntry + var maxWeight uint64 + now := time.Now() + + for _, entry := range r.queries { + if now.Sub(entry.RegisteredAt) < minAge { + continue + } + weight := r.metric(entry.Stats) + if heaviest == nil || weight > maxWeight { + heaviest = entry + maxWeight = weight + } + } + + return heaviest +} + +// Len returns the number of currently registered queries. +func (r *QueryRegistry) Len() int { + r.mu.RLock() + defer r.mu.RUnlock() + + return len(r.queries) +} + +// ResolveMetricFunc returns the MetricFunc for the given metric name. +// An empty string defaults to "fetched_samples". +func ResolveMetricFunc(metricName string) (MetricFunc, error) { + switch metricName { + case "fetched_samples", "": + return func(s *querier_stats.QueryStats) uint64 { + return s.LoadFetchedSamples() + }, nil + case "fetched_series": + return func(s *querier_stats.QueryStats) uint64 { + return s.LoadFetchedSeries() + }, nil + case "fetched_chunks": + return func(s *querier_stats.QueryStats) uint64 { + return s.LoadFetchedChunks() + }, nil + case "fetched_chunk_bytes": + return func(s *querier_stats.QueryStats) uint64 { + return s.LoadFetchedChunkBytes() + }, nil + default: + return nil, fmt.Errorf("unsupported eviction metric: %s", metricName) + } +} diff --git a/pkg/util/queryeviction/registry_test.go b/pkg/util/queryeviction/registry_test.go new file mode 100644 index 0000000000..c52181809c --- /dev/null +++ b/pkg/util/queryeviction/registry_test.go @@ -0,0 +1,199 @@ +package queryeviction + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" +) + +// newTestStats creates a QueryStats with the given fetched samples value. +func newTestStats(fetchedSamples uint64) *querier_stats.QueryStats { + stats := &querier_stats.QueryStats{} + stats.AddFetchedSamples(fetchedSamples) + return stats +} + +// testMetricFunc is a MetricFunc that returns FetchedSamples for testing. +func testMetricFunc(s *querier_stats.QueryStats) uint64 { + return s.LoadFetchedSamples() +} + +func TestRegister_UniqueMonotonicallyIncreasingIDs(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + ids := make([]uint64, 10) + for i := range 10 { + ids[i] = reg.Register(cancel, newTestStats(0), "query", "user", "") + } + + for i := 1; i < len(ids); i++ { + assert.Greater(t, ids[i], ids[i-1], "IDs should be monotonically increasing") + } + + // Verify all IDs are unique. + seen := make(map[uint64]bool) + for _, id := range ids { + assert.False(t, seen[id], "ID %d should be unique", id) + seen[id] = true + } +} + +func TestDeregister_RemovesEntry(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + id := reg.Register(cancel, newTestStats(100), "query", "user", "") + require.Equal(t, 1, reg.Len()) + + // FindHeaviest should return the registered entry. + heaviest := reg.FindHeaviest(0) + require.NotNil(t, heaviest) + assert.Equal(t, id, heaviest.QueryID) + + // Deregister and verify it's gone. + reg.Deregister(id) + assert.Equal(t, 0, reg.Len()) + assert.Nil(t, reg.FindHeaviest(0), "FindHeaviest should return nil after deregistering the only entry") +} + +func TestDeregister_UnknownID_IsNoOp(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + id := reg.Register(cancel, newTestStats(100), "query", "user", "") + + // Deregister an ID that was never registered. + reg.Deregister(99999) + + // Original entry should still be present. + assert.Equal(t, 1, reg.Len()) + heaviest := reg.FindHeaviest(0) + require.NotNil(t, heaviest) + assert.Equal(t, id, heaviest.QueryID) +} + +func TestFindHeaviest_ReturnsHighestMetricValue(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + reg.Register(cancel, newTestStats(100), "small-query", "user1", "") + reg.Register(cancel, newTestStats(500), "medium-query", "user2", "") + heaviestID := reg.Register(cancel, newTestStats(1000), "large-query", "user3", "") + reg.Register(cancel, newTestStats(200), "another-query", "user4", "") + + heaviest := reg.FindHeaviest(0) + require.NotNil(t, heaviest) + assert.Equal(t, heaviestID, heaviest.QueryID) + assert.Equal(t, "large-query", heaviest.QueryExpr) + assert.Equal(t, uint64(1000), heaviest.Stats.LoadFetchedSamples()) +} + +func TestFindHeaviest_EmptyRegistry(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + assert.Nil(t, reg.FindHeaviest(0), "FindHeaviest should return nil for empty registry") +} + +func TestLen_ReflectsCurrentCount(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + assert.Equal(t, 0, reg.Len()) + + id1 := reg.Register(cancel, newTestStats(10), "q1", "u1", "") + assert.Equal(t, 1, reg.Len()) + + id2 := reg.Register(cancel, newTestStats(20), "q2", "u2", "") + assert.Equal(t, 2, reg.Len()) + + id3 := reg.Register(cancel, newTestStats(30), "q3", "u3", "") + assert.Equal(t, 3, reg.Len()) + + reg.Deregister(id2) + assert.Equal(t, 2, reg.Len()) + + reg.Deregister(id1) + assert.Equal(t, 1, reg.Len()) + + reg.Deregister(id3) + assert.Equal(t, 0, reg.Len()) +} + +func TestConcurrent_RegisterDeregisterFindHeaviest(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + + const goroutines = 20 + const opsPerGoroutine = 100 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for range goroutines { + go func() { + defer wg.Done() + for i := range opsPerGoroutine { + _, cancel := context.WithCancel(context.Background()) + stats := newTestStats(uint64(i)) + id := reg.Register(cancel, stats, "concurrent-query", "user", "") + + // Interleave FindHeaviest and Len calls. + _ = reg.FindHeaviest(0) + _ = reg.Len() + + reg.Deregister(id) + cancel() + } + }() + } + + wg.Wait() + + // After all goroutines complete, registry should be empty. + assert.Equal(t, 0, reg.Len()) +} + +func TestResolveMetricFunc_AllSupportedMetrics(t *testing.T) { + stats := &querier_stats.QueryStats{} + stats.AddFetchedSamples(20) + stats.AddFetchedSeries(30) + stats.AddFetchedChunks(40) + stats.AddFetchedChunkBytes(50) + + tests := []struct { + name string + metricName string + expectedValue uint64 + }{ + {name: "fetched_samples", metricName: "fetched_samples", expectedValue: 20}, + {name: "empty string defaults to fetched_samples", metricName: "", expectedValue: 20}, + {name: "fetched_series", metricName: "fetched_series", expectedValue: 30}, + {name: "fetched_chunks", metricName: "fetched_chunks", expectedValue: 40}, + {name: "fetched_chunk_bytes", metricName: "fetched_chunk_bytes", expectedValue: 50}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + fn, err := ResolveMetricFunc(tc.metricName) + require.NoError(t, err) + require.NotNil(t, fn) + assert.Equal(t, tc.expectedValue, fn(stats)) + }) + } +} + +func TestResolveMetricFunc_UnsupportedMetric(t *testing.T) { + fn, err := ResolveMetricFunc("unknown_metric") + assert.Error(t, err) + assert.Nil(t, fn) + assert.Contains(t, err.Error(), "unsupported eviction metric") +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index ed2549651f..63d7339902 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -4881,6 +4881,54 @@ }, "query_protection": { "properties": { + "eviction": { + "properties": { + "check_interval": { + "default": "1s", + "description": "EXPERIMENTAL: How frequently the evictor checks system resource utilization.", + "type": "string", + "x-cli-flag": "ingester.query-protection.eviction.check-interval", + "x-format": "duration" + }, + "cooldown_period": { + "default": 3, + "description": "EXPERIMENTAL: Number of check intervals to wait after an eviction before evicting again.", + "type": "number", + "x-cli-flag": "ingester.query-protection.eviction.cooldown-period" + }, + "eviction_metric": { + "default": "fetched_samples", + "description": "EXPERIMENTAL: The query metric used to determine the heaviest query for eviction. Supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes.", + "type": "string", + "x-cli-flag": "ingester.query-protection.eviction.eviction-metric" + }, + "min_query_age": { + "default": "10s", + "description": "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.", + "type": "string", + "x-cli-flag": "ingester.query-protection.eviction.min-query-age", + "x-format": "duration" + }, + "threshold": { + "properties": { + "cpu_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max CPU utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "ingester.query-protection.eviction.threshold.cpu-utilization" + }, + "heap_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max heap utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "ingester.query-protection.eviction.threshold.heap-utilization" + } + }, + "type": "object" + } + }, + "type": "object" + }, "rejection": { "properties": { "threshold": { @@ -6044,6 +6092,54 @@ }, "query_protection": { "properties": { + "eviction": { + "properties": { + "check_interval": { + "default": "1s", + "description": "EXPERIMENTAL: How frequently the evictor checks system resource utilization.", + "type": "string", + "x-cli-flag": "querier.query-protection.eviction.check-interval", + "x-format": "duration" + }, + "cooldown_period": { + "default": 3, + "description": "EXPERIMENTAL: Number of check intervals to wait after an eviction before evicting again.", + "type": "number", + "x-cli-flag": "querier.query-protection.eviction.cooldown-period" + }, + "eviction_metric": { + "default": "fetched_samples", + "description": "EXPERIMENTAL: The query metric used to determine the heaviest query for eviction. Supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes.", + "type": "string", + "x-cli-flag": "querier.query-protection.eviction.eviction-metric" + }, + "min_query_age": { + "default": "10s", + "description": "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.", + "type": "string", + "x-cli-flag": "querier.query-protection.eviction.min-query-age", + "x-format": "duration" + }, + "threshold": { + "properties": { + "cpu_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max CPU utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "querier.query-protection.eviction.threshold.cpu-utilization" + }, + "heap_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max heap utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "querier.query-protection.eviction.threshold.heap-utilization" + } + }, + "type": "object" + } + }, + "type": "object" + }, "rejection": { "properties": { "threshold": { @@ -8410,6 +8506,54 @@ }, "query_protection": { "properties": { + "eviction": { + "properties": { + "check_interval": { + "default": "1s", + "description": "EXPERIMENTAL: How frequently the evictor checks system resource utilization.", + "type": "string", + "x-cli-flag": "store-gateway.query-protection.eviction.check-interval", + "x-format": "duration" + }, + "cooldown_period": { + "default": 3, + "description": "EXPERIMENTAL: Number of check intervals to wait after an eviction before evicting again.", + "type": "number", + "x-cli-flag": "store-gateway.query-protection.eviction.cooldown-period" + }, + "eviction_metric": { + "default": "fetched_samples", + "description": "EXPERIMENTAL: The query metric used to determine the heaviest query for eviction. Supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes.", + "type": "string", + "x-cli-flag": "store-gateway.query-protection.eviction.eviction-metric" + }, + "min_query_age": { + "default": "10s", + "description": "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.", + "type": "string", + "x-cli-flag": "store-gateway.query-protection.eviction.min-query-age", + "x-format": "duration" + }, + "threshold": { + "properties": { + "cpu_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max CPU utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "store-gateway.query-protection.eviction.threshold.cpu-utilization" + }, + "heap_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max heap utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "store-gateway.query-protection.eviction.threshold.heap-utilization" + } + }, + "type": "object" + } + }, + "type": "object" + }, "rejection": { "properties": { "threshold": {