Skip to content

feat(read path): proof-of-concept server-streaming query APIs (V2)#5154

Draft
simonswine wants to merge 6 commits into
grafana:mainfrom
simonswine:20260513_streaming-query-results
Draft

feat(read path): proof-of-concept server-streaming query APIs (V2)#5154
simonswine wants to merge 6 commits into
grafana:mainfrom
simonswine:20260513_streaming-query-results

Conversation

@simonswine

@simonswine simonswine commented May 14, 2026

Copy link
Copy Markdown
Contributor

Summary

This PR adds proof-of-concept server-streaming query RPCs to the V2 read path, enabling the UI to render incremental flamegraph and timeline results as they arrive rather than waiting for the full query to complete.

No behavioural change on the unary path. The existing Invoke / SelectMergeStacktraces / SelectSeries RPCs are unchanged. The new streaming endpoints return CodeUnimplemented on V1 routes, and the UI falls back to unary automatically.

What's in this PR

Backend — pkg/querybackend

  • InvokeStream server-streaming RPC: same execution engine as Invoke but emits IndexLookupEvent messages as each block finishes its TSDB dataset-index lookup, periodic SnapshotEvent messages carrying intermediate merged results (full-replace, not delta), and a final TerminalEvent with the fully-merged result and diagnostics.
  • reportAggregator.snapshot(): safe concurrent read of merger state for mid-stream snapshots without blocking block workers.
  • Merge-node streaming: fans out InvokeStream to child shards concurrently; accumulates per-child latest reports and rebuilds a fresh merge on each snapshot tick (avoids double-counting between snapshots).

Query frontend — pkg/frontend/readpath

  • SelectMergeStacktracesStream and SelectSeriesStream Connect server-streaming handlers.
  • Each handler fans out InvokeStream to query-backend shards, forwards IndexLookupEventQueryPlanUpdate messages, coalesces backend SnapshotEventQuerySnapshot / SeriesChunk messages at ~250 ms cadence, and emits a final QueryResult.

CLI — cmd/profilecli

  • profilecli query flamegraph-stream and profilecli query series-stream subcommands for end-to-end testing without the UI.

UI — ui/src

  • api/streaming.ts: Connect framing over fetch, AbortController-based cancellation.
  • usePyroscopeQuery: two concurrent streaming subscriptions (flamegraph + timeline); CodeUnimplemented falls back to unary.
  • Progress bar: QueryPlanUpdate drives blocksTotal / datasetsTotal; SnapshotEvent drives blocksDone / datasetsDone.
  • TimeSeries axis pinned so the y-axis doesn't jump as chunks arrive.

Proto

  • api/query/v1/query.proto: InvokeStream RPC + InvokeStreamEvent / IndexLookupEvent / SnapshotEvent / TerminalEvent messages.
  • api/querier/v1/querier.proto: SelectMergeStacktracesStream / SelectSeriesStream RPCs + SelectMergeStacktracesPartial / SelectSeriesPartial / QueryPlanUpdate / QuerySnapshot / SeriesChunk / QueryResult messages.

Non-goals / deferred

  • V1 streaming (would require scheduler protocol changes)
  • Delta wire format for tree snapshots
  • bytes_total_estimate denominator in the UI progress bar (shown as auxiliary "scanned X MB" only)
  • Observability metrics for the streaming path
  • Per-tenant feature flag gating

Architecture notes

  • Transport is Connect server-streaming (HTTP/2 + Connect framing), not WebSocket. Auth, cancellation, metrics, and ingress all reuse existing paths.
  • datasets_total in QueryPlanUpdate is a running value that increases as IndexLookupEvent messages arrive; the UI treats it as mutable.
  • Snapshots are full-replace on the client (not delta) because maxNodes pruning is non-monotone.
  • Cancellation: AbortController.abort() → fetch body close → HTTP/2 RST_STREAM → server ctx.Done() propagates through the errgroup chain.

Test plan

  • go test ./pkg/querybackend/... — report aggregator snapshot safety
  • go test ./pkg/frontend/readpath/... — streaming handler unit tests
  • profilecli query flamegraph-stream against a running V2 cluster — verify IndexLookupEvent messages arrive before snapshots
  • UI: open flamegraph page — verify progress bar fills incrementally and final result matches unary response
  • UI: change query while stream is in flight — verify cancellation and no stale data
  • UI against V1 backend — verify silent fallback to unary (no error shown)

Note

Medium Risk
Introduces new public streaming RPC surfaces and message types, which may affect client generation and compatibility if consumers start depending on the new contracts. Most changes are generated schema/proto code, but they expand API surface area and streaming semantics.

Overview
Adds new server-streaming RPC endpoints for querying (InvokeStream, SelectMergeStacktracesStream, SelectSeriesStream) in the generated OpenAPI specs, alongside a new dedicated QuerierStreamService spec.

Defines the streaming event/message model in generated schemas and Go protos (e.g., InvokeStreamEvent with IndexLookupEvent/SnapshotEvent/TerminalEvent, and querier-side QueryPlanUpdate/QuerySnapshot/SeriesChunk/QueryResult wrapped in Select*Partial oneofs) to support incremental progress updates and partial results until a final terminal result.

Updates generated google.v1.Profile Go code to drop parquet struct tags from several fields (generated code shape change only).

Reviewed by Cursor Bugbot for commit 00cc041. Bugbot is set up for automated code reviews on this repo. Configure here.

Adds InvokeStream to the query-backend service and SelectMergeStacktracesStream /
SelectSeriesStream to the querier service, enabling the read path to push
incremental results to the client as blocks are processed rather than waiting
for all blocks to complete.

Key design points:
- BlockReader emits an IndexLookupEvent per block immediately after TSDB dataset
  resolution, allowing the frontend to show a running query-plan total without
  a separate pre-call that was doubling latency.
- A 250ms snapshot ticker in BlockReader sends partial aggregation results while
  blocks are still in flight.
- QueryFrontend.StreamSelectMergeStacktraces / StreamSelectSeries translate
  backend events to UI-facing SelectMergeStacktracesPartial / SelectSeriesPartial
  messages (QueryPlanUpdate, QuerySnapshot/SeriesChunk, QueryResult).
- The Router uses a type assertion to inProcessStreamer so the streaming path
  never pays a network round-trip through a ServerStreamForClient proxy.
- To resolve the Go naming conflict between QuerierServiceClient and
  QuerierServiceHandler on server-streaming methods, Router fields use a narrow
  unaryQuerierSvc interface, and wrappers (LogSpanParametersWrapper,
  diagnostics.Wrapper) now wrap QuerierServiceHandler. RegisterPyroscopeHandlers
  and NewHTTPHandlers accept the new HTTPQueryClient interface.
- V1 querier and frontend receive CodeUnimplemented stubs; no V1 code paths
  were changed.
…mmands

Introduces a dedicated QuerierStreamService (separate from QuerierServiceHandler
to avoid Go interface conflicts on server-streaming methods) with
SelectMergeStacktracesStream and SelectSeriesStream RPCs, wires it into the
query frontend, and exposes it via the profilecli `query stream` subcommands.

Additional changes:
- Backend mergeStream emits periodic 250ms snapshots via an independent goroutine
  rather than relying solely on child-backend events, ensuring the client sees
  progress even if backends batch their snapshots.
- Fix data race in TreeMerger: add Bytes() helper that holds the internal mutex,
  and update treeAggregator.build() to use it (with symbolLock held) instead of
  calling Tree().Bytes() without synchronisation.
- Fix data race in timeseries.Merger: hold mu in TimeSeries() and Top(), and
  return a new Series struct from mergeTimeSeries() so callers don't alias the
  internal map entry.
- Remove stale parquet struct tags from google/v1/profile.pb.go (regenerated).
Replace the unary fetchFlamegraph/fetchTimeline pair in
usePyroscopeQuery with two concurrent Connect server-streams against
QuerierStreamService. Partial flamegraph and series data render as
each ~500ms progress event arrives, and the panel headers surface
bytes scanned and ETA while a query is in flight. A per-run abort
controller plus a monotonic run-id token guarantees stale callbacks
(including those from the unary fallback) cannot clobber newer
queries. When the server returns 404 / 501 or an end-stream
"unimplemented" code, the hook silently falls back to the existing
unary RPCs so the UI keeps working against V1 deployments.

The streaming client is hand-rolled over fetch + ReadableStream
(~80 LOC parser plus dispatch glue) to avoid pulling in
@connectrpc/connect-web and a protobuf codegen pipeline.
Surface streaming progress as a thin determinate bar at the top of
each Panel, filled to bytes_done / bytes_total_estimate. Falls back to
0% before any IndexLookup events arrive so the bar appears immediately
on query start.

Pin the TimeSeries chart's x-axis to the resolved query window
exposed from usePyroscopeQuery as queryWindow. Previously the axis
recomputed rangeEnd from Date.now() on every render, so it drifted by
milliseconds on each progress event and disappeared entirely when the
incoming series buffer was empty. Now the axis renders immediately
from queryWindow.start/end and stays put across progress updates;
the area/line paths simply omit themselves until the first data point
arrives. parseRangeMs is no longer needed and is removed.
mergeStream previously fed each child Snapshot/Terminal event into a
single running reportAggregator via aggregateResponse, but child
snapshots carry the cumulative state of every block that child has
merged so far — not a delta. Each new child snapshot therefore
re-added every prior block, inflating sample counts roughly with the
square of the number of snapshots a child emitted. The visible effect
in the UI was a flamegraph whose implicit "others" bucket grew
monotonically with every parent tick (because maxNodes prunes a wider
tail as totals balloon).

Hold the latest reports per child slot and rebuild a fresh merge
aggregator on each parent snapshot tick (and once more after the
errgroup joins, for the terminal event). Replacement semantics give
each child exactly one contribution at any moment, so leaf samples
are counted once regardless of how many intermediate snapshots a
child emitted.

The per-tick rebuild parses every child's latest tree bytes again,
which is more work than incremental merging — at 4 ticks/sec with 20
children emitting ~100 KB trees that's roughly 8 MB/sec of parsing.
Tolerable for now; a delta wire format remains the long-term fix
(deferred per the streaming plan).
…overcounting

mergeTimeSeries() mutates Series.Points in-place: it extends the first
child's Points slice with the second child's points, then modifies
Point.Value fields when summing duplicates at the same timestamp. Because
buildAggregate() fed raw cs.reports pointers into each fresh aggregator,
a second call would find the first child's series already carrying A+B
values and add B again, yielding A+2B. This compounded on every 250ms
snapshot tick. The same race could corrupt the final terminal result if
the snapshot goroutine fired in the window between g.Wait() and cancel().

Fix: clone each child's reports with CloneVT() before aggregating.
Mutations from mergeTimeSeries() now only affect temporary copies,
leaving cs.reports untouched across all buildAggregate() calls.
@simonswine simonswine marked this pull request as draft May 14, 2026 12:12

@cursor cursor Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is ON, but it could not run because the branch was deleted or merged before autofix could start.

Reviewed by Cursor Bugbot for commit 00cc041. Configure here.

sizeCache protoimpl.SizeCache `parquet:"-"`
DefaultSampleType int64 `protobuf:"varint,14,opt,name=default_sample_type,json=defaultSampleType,proto3" json:"default_sample_type,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet struct tags removed from generated protobuf file

Low Severity

The parquet struct tags on Profile, ValueType, Sample, and Label have been stripped. The Makefile generate target runs tools/add-parquet-tags.sh to add these tags after protobuf code generation. This commit's output diverges from what make generate produces, meaning the next full regeneration will create an unexpected diff re-adding the tags. Likely the proto file was regenerated without running the full build pipeline.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 00cc041. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant