From ce9aaa5fea674fa2527603ff40c3245da92cd2b0 Mon Sep 17 00:00:00 2001 From: Aravinda-HWK Date: Sun, 21 Jun 2026 14:57:01 +0530 Subject: [PATCH 1/2] feat: incremental (delta) sync for mailbox refresh Implements Phase 3 of the Email 2.0 proposal: refresh now fetches only what changed instead of re-listing the whole folder. Because go-imap v1 exposes no CONDSTORE/MODSEQ, the delta is computed from UIDs + flags rather than a MODSEQ token: Server - New GET /api/v1/mailboxes/{mailbox}/changes endpoint returning a MailboxDelta {uidvalidity, total, resync, added, flags, removed}. - imap.MailboxChanges: UID SEARCH (since+1):* for new envelopes, a flags-only UID FETCH over the client's known UIDs to derive flag changes + removals; UIDVALIDITY mismatch signals a full resync. - ListMessages now also returns uidvalidity so the client has a baseline. - Factored shared fetchEnvelopes/fetchFlags helpers. Client - Dexie v2 syncMeta table persists uidvalidity per folder. - mailboxes.changes() endpoint + MailboxDelta/FlagUpdate types. - DataContext reconciles added/removed/flag changes idempotently (keyed on UID) and exposes refreshFolder(role); a ThreadList refresh button wires it into Inbox/Sent/Trash. Performance - refreshFolder syncs only the viewed folder, so four folder syncs no longer serialize on the session's single IMAP connection. - IMAPFor drops a redundant per-request NOOP probe (one RTT) since every operation already self-heals via ensureLive. Tests: parseUIDList unit tests; server build + go test and frontend typecheck pass. --- server/internal/http/handlers/mailboxes.go | 63 ++++++- .../internal/http/handlers/mailboxes_test.go | 48 ++++++ server/internal/http/router.go | 1 + server/internal/imap/client.go | 159 +++++++++++++++--- server/internal/mail/types.go | 30 ++++ server/internal/session/store.go | 13 +- src/nonview/api/endpoints.ts | 20 +++ src/nonview/api/types.ts | 23 +++ src/nonview/cache/db.ts | Bin 5367 -> 6966 bytes src/nonview/core/DataContext.tsx | 136 ++++++++++++++- src/view/moles/ThreadList.tsx | 79 ++++++--- src/view/pages/InboxPage.tsx | 3 +- src/view/pages/SentPage.tsx | 2 + src/view/pages/TrashPage.tsx | 2 + 14 files changed, 523 insertions(+), 56 deletions(-) create mode 100644 server/internal/http/handlers/mailboxes_test.go diff --git a/server/internal/http/handlers/mailboxes.go b/server/internal/http/handlers/mailboxes.go index 3e951b1..ac44ffe 100644 --- a/server/internal/http/handlers/mailboxes.go +++ b/server/internal/http/handlers/mailboxes.go @@ -5,6 +5,7 @@ import ( "net/http" "net/url" "strconv" + "strings" "github.com/go-chi/chi/v5" @@ -63,14 +64,72 @@ func (h *Mailboxes) ListMessages(w http.ResponseWriter, r *http.Request) { httpx.WriteError(w, r, h.Logger, httpx.NewAPIError(http.StatusBadGateway, httpx.CodeUpstreamFailed, "imap connect", err)) return } - envelopes, total, err := c.ListMessages(r.Context(), mailbox, limit, uint32(before)) + envelopes, total, uidvalidity, err := c.ListMessages(r.Context(), mailbox, limit, uint32(before)) if err != nil { httpx.WriteError(w, r, h.Logger, httpx.NewAPIError(http.StatusBadGateway, httpx.CodeUpstreamFailed, "list messages", err)) return } - resp := map[string]any{"messages": envelopes, "total": total} + resp := map[string]any{"messages": envelopes, "total": total, "uidvalidity": uidvalidity} if len(envelopes) > 0 { resp["next_before"] = envelopes[len(envelopes)-1].UID } httpx.WriteJSON(w, http.StatusOK, resp) } + +// Changes returns an incremental-sync delta for the named mailbox (proposal §6), +// so the client refreshes by fetching only what changed rather than re-listing. +// +// Query params: +// - uidvalidity (the client's cached UIDVALIDITY; 0/absent on first sync) +// - known (comma-separated UIDs the client currently holds) +// - limit (1..200, default 50 — caps how many new envelopes are returned) +func (h *Mailboxes) Changes(w http.ResponseWriter, r *http.Request) { + sess, ok := middleware.SessionFrom(r.Context()) + if !ok { + httpx.WriteError(w, r, h.Logger, httpx.NewAPIError(http.StatusUnauthorized, httpx.CodeUnauthorized, "no session", nil)) + return + } + mailbox, err := url.PathUnescape(chi.URLParam(r, "mailbox")) + if err != nil || mailbox == "" { + httpx.WriteError(w, r, h.Logger, httpx.NewAPIError(http.StatusBadRequest, httpx.CodeBadRequest, "invalid mailbox name", err)) + return + } + q := r.URL.Query() + uidvalidity, _ := strconv.ParseUint(q.Get("uidvalidity"), 10, 32) + limit, _ := strconv.Atoi(q.Get("limit")) + known := parseUIDList(q.Get("known")) + + c, err := h.Sessions.IMAPFor(r.Context(), sess) + if err != nil { + httpx.WriteError(w, r, h.Logger, httpx.NewAPIError(http.StatusBadGateway, httpx.CodeUpstreamFailed, "imap connect", err)) + return + } + delta, err := c.MailboxChanges(r.Context(), mailbox, uint32(uidvalidity), known, limit) + if err != nil { + httpx.WriteError(w, r, h.Logger, httpx.NewAPIError(http.StatusBadGateway, httpx.CodeUpstreamFailed, "mailbox changes", err)) + return + } + httpx.WriteJSON(w, http.StatusOK, delta) +} + +// parseUIDList parses a comma-separated list of UIDs, skipping any malformed or +// zero entries. Caps the input to a sane bound so a runaway query string can't +// force an unbounded FETCH. +func parseUIDList(s string) []uint32 { + if s == "" { + return nil + } + parts := strings.Split(s, ",") + if len(parts) > 1000 { + parts = parts[:1000] + } + out := make([]uint32, 0, len(parts)) + for _, p := range parts { + n, err := strconv.ParseUint(strings.TrimSpace(p), 10, 32) + if err != nil || n == 0 { + continue + } + out = append(out, uint32(n)) + } + return out +} diff --git a/server/internal/http/handlers/mailboxes_test.go b/server/internal/http/handlers/mailboxes_test.go new file mode 100644 index 0000000..97eb95d --- /dev/null +++ b/server/internal/http/handlers/mailboxes_test.go @@ -0,0 +1,48 @@ +package handlers + +import ( + "reflect" + "testing" +) + +func TestParseUIDList(t *testing.T) { + tests := []struct { + name string + in string + want []uint32 + }{ + {"empty", "", nil}, + {"single", "42", []uint32{42}}, + {"multiple", "1,2,3", []uint32{1, 2, 3}}, + {"whitespace", " 1 , 2 ,3 ", []uint32{1, 2, 3}}, + {"skips zero", "0,5,0,7", []uint32{5, 7}}, + {"skips malformed", "1,abc,3,,5", []uint32{1, 3, 5}}, + {"all invalid", "x,y,z", []uint32{}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := parseUIDList(tt.in) + if len(got) == 0 && len(tt.want) == 0 { + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseUIDList(%q) = %v, want %v", tt.in, got, tt.want) + } + }) + } +} + +func TestParseUIDListCapsInput(t *testing.T) { + // Build a 1500-entry list; parser must cap the parsed set at 1000 so a + // runaway query string can't force an unbounded FETCH. + in := "" + for i := 0; i < 1500; i++ { + if i > 0 { + in += "," + } + in += "1" + } + if got := len(parseUIDList(in)); got > 1000 { + t.Errorf("parseUIDList did not cap input: got %d entries, want <= 1000", got) + } +} diff --git a/server/internal/http/router.go b/server/internal/http/router.go index caec548..50eaee5 100644 --- a/server/internal/http/router.go +++ b/server/internal/http/router.go @@ -63,6 +63,7 @@ func NewRouter(d Deps) http.Handler { r.Use(requireSession) r.Get("/mailboxes", mboxH.List) r.Get("/mailboxes/{mailbox}/messages", mboxH.ListMessages) + r.Get("/mailboxes/{mailbox}/changes", mboxH.Changes) r.Get("/mailboxes/{mailbox}/messages/{uid}", msgH.Get) r.Patch("/mailboxes/{mailbox}/messages/{uid}/flags", msgH.SetFlags) r.Delete("/mailboxes/{mailbox}/messages/{uid}", msgH.Delete) diff --git a/server/internal/imap/client.go b/server/internal/imap/client.go index 2e285d9..9dbcb4a 100644 --- a/server/internal/imap/client.go +++ b/server/internal/imap/client.go @@ -176,26 +176,26 @@ func (c *Client) selectMailbox(name string, readOnly bool) error { // ListMessages returns up to limit envelopes from the given mailbox, newest first. // If before > 0, only messages with UID < before are returned (cursor-style paging). // The returned total is the mailbox's full message count (independent of the -// page), suitable for a "1–50 of N" pager. -func (c *Client) ListMessages(ctx context.Context, mailbox string, limit int, before uint32) ([]hmail.Envelope, uint32, error) { +// page), suitable for a "1–50 of N" pager. uidvalidity is the mailbox's current +// UIDVALIDITY, which the client persists to detect cache-invalidating changes +// on a later delta sync. +func (c *Client) ListMessages(ctx context.Context, mailbox string, limit int, before uint32) (envs []hmail.Envelope, total, uidvalidity uint32, err error) { if limit <= 0 || limit > 200 { limit = 50 } c.mu.Lock() defer c.mu.Unlock() if err := c.ensureLive(ctx); err != nil { - return nil, 0, err - } - if err := c.selectMailbox(mailbox, true); err != nil { - return nil, 0, err + return nil, 0, 0, err } mbox, err := c.conn.Select(mailbox, true) if err != nil { - return nil, 0, err + return nil, 0, 0, fmt.Errorf("select %q: %w", mailbox, err) } - total := mbox.Messages + c.selected = mailbox + total, uidvalidity = mbox.Messages, mbox.UidValidity if total == 0 { - return []hmail.Envelope{}, 0, nil + return []hmail.Envelope{}, 0, uidvalidity, nil } // Fetch the highest UID first; if before is set, cap the upper bound there. @@ -207,35 +207,150 @@ func (c *Client) ListMessages(ctx context.Context, mailbox string, limit int, be } uids, err := c.conn.UidSearch(criteria) if err != nil { - return nil, 0, fmt.Errorf("uid search: %w", err) + return nil, 0, 0, fmt.Errorf("uid search: %w", err) } if len(uids) == 0 { - return []hmail.Envelope{}, total, nil + return []hmail.Envelope{}, total, uidvalidity, nil } // Newest first; take the last `limit` UIDs. if len(uids) > limit { uids = uids[len(uids)-limit:] } + envelopes, err := c.fetchEnvelopes(uids) + if err != nil { + return nil, 0, 0, err + } + reverseEnvelopes(envelopes) // UID-ascending fetch → newest-first + return envelopes, total, uidvalidity, nil +} + +// MailboxChanges computes an incremental-sync delta for a mailbox given the +// client's last-known state (proposal §6). knownValidity is the UIDVALIDITY the +// client cached for this folder (0 if none); known is the set of UIDs it +// currently holds. See hmail.MailboxDelta for the contract. +// +// Caller does not need the mailbox SELECTed beforehand. +func (c *Client) MailboxChanges(ctx context.Context, mailbox string, knownValidity uint32, known []uint32, limit int) (hmail.MailboxDelta, error) { + if limit <= 0 || limit > 200 { + limit = 50 + } + c.mu.Lock() + defer c.mu.Unlock() + + var delta hmail.MailboxDelta + if err := c.ensureLive(ctx); err != nil { + return delta, err + } + mbox, err := c.conn.Select(mailbox, true) + if err != nil { + return delta, fmt.Errorf("select %q: %w", mailbox, err) + } + c.selected = mailbox + delta.UIDValidity, delta.Total = mbox.UidValidity, mbox.Messages + + // UIDVALIDITY changed → the client's cached UIDs no longer identify the same + // messages. Signal a full resync and skip the (now meaningless) diff. + if knownValidity != 0 && knownValidity != mbox.UidValidity { + delta.Resync = true + return delta, nil + } + + // Watermark: IMAP UIDs increase monotonically, so anything strictly greater + // than the highest UID the client holds is genuinely new. + var sinceUID uint32 + for _, u := range known { + if u > sinceUID { + sinceUID = u + } + } + + // 1. Added — UIDs in (sinceUID+1):*, capped to the newest `limit`. + if mbox.Messages > 0 && sinceUID < ^uint32(0) { + crit := imap.NewSearchCriteria() + seq := new(imap.SeqSet) + seq.AddRange(sinceUID+1, 0) // "(sinceUID+1):*" — 0 means "*" + crit.Uid = seq + newUIDs, err := c.conn.UidSearch(crit) + if err != nil { + return delta, fmt.Errorf("uid search added: %w", err) + } + if len(newUIDs) > limit { + newUIDs = newUIDs[len(newUIDs)-limit:] + } + if len(newUIDs) > 0 { + added, err := c.fetchEnvelopes(newUIDs) + if err != nil { + return delta, err + } + reverseEnvelopes(added) // newest-first, matching ListMessages + delta.Added = added + } + } + + // 2. Flags + removals among the known set. A known UID absent from the + // FLAGS fetch has been expunged or moved away. + if len(known) > 0 { + present, err := c.fetchFlags(known) + if err != nil { + return delta, err + } + for _, u := range known { + if fl, ok := present[u]; ok { + delta.Flags = append(delta.Flags, hmail.FlagUpdate{UID: u, Flags: fl}) + } else { + delta.Removed = append(delta.Removed, u) + } + } + } + return delta, nil +} + +// fetchEnvelopes fetches list-view envelopes for the given UIDs in UID-ascending +// order. Caller must hold c.mu and have the mailbox SELECTed. +func (c *Client) fetchEnvelopes(uids []uint32) ([]hmail.Envelope, error) { seq := new(imap.SeqSet) seq.AddNum(uids...) - msgs := make(chan *imap.Message, len(uids)) - fetchDone := make(chan error, 1) + done := make(chan error, 1) items := []imap.FetchItem{imap.FetchEnvelope, imap.FetchFlags, imap.FetchUid, imap.FetchBodyStructure} - go func() { fetchDone <- c.conn.UidFetch(seq, items, msgs) }() + go func() { done <- c.conn.UidFetch(seq, items, msgs) }() - var envelopes []hmail.Envelope + var out []hmail.Envelope for m := range msgs { - envelopes = append(envelopes, envelopeFrom(m)) + out = append(out, envelopeFrom(m)) } - if err := <-fetchDone; err != nil { - return nil, 0, fmt.Errorf("fetch envelopes: %w", err) + if err := <-done; err != nil { + return nil, fmt.Errorf("fetch envelopes: %w", err) } - // Sort newest-first by UID descending. - for i, j := 0, len(envelopes)-1; i < j; i, j = i+1, j-1 { - envelopes[i], envelopes[j] = envelopes[j], envelopes[i] + return out, nil +} + +// fetchFlags fetches only the flags for the given UIDs, returned as a uid→flags +// map. UIDs that no longer exist are simply omitted from the result. Caller must +// hold c.mu and have the mailbox SELECTed. +func (c *Client) fetchFlags(uids []uint32) (map[uint32][]string, error) { + seq := new(imap.SeqSet) + seq.AddNum(uids...) + msgs := make(chan *imap.Message, len(uids)) + done := make(chan error, 1) + items := []imap.FetchItem{imap.FetchFlags, imap.FetchUid} + go func() { done <- c.conn.UidFetch(seq, items, msgs) }() + + out := make(map[uint32][]string, len(uids)) + for m := range msgs { + out[m.Uid] = append([]string(nil), m.Flags...) + } + if err := <-done; err != nil { + return nil, fmt.Errorf("fetch flags: %w", err) + } + return out, nil +} + +// reverseEnvelopes flips a UID-ascending slice in place to newest-first. +func reverseEnvelopes(e []hmail.Envelope) { + for i, j := 0, len(e)-1; i < j; i, j = i+1, j-1 { + e[i], e[j] = e[j], e[i] } - return envelopes, total, nil } func envelopeFrom(m *imap.Message) hmail.Envelope { diff --git a/server/internal/mail/types.go b/server/internal/mail/types.go index 408e1eb..492405e 100644 --- a/server/internal/mail/types.go +++ b/server/internal/mail/types.go @@ -48,6 +48,36 @@ type Envelope struct { Preview string `json:"preview,omitempty"` } +// FlagUpdate carries the current IMAP flags for a single known message. The +// delta endpoint returns one per message the client already knows about and is +// still present upstream; the client diffs these against its cached flags to +// detect read/unread (and other flag) transitions. +type FlagUpdate struct { + UID uint32 `json:"uid"` + Flags []string `json:"flags"` +} + +// MailboxDelta is the incremental-sync payload for one mailbox (proposal §6). +// +// Because go-imap v1 exposes no CONDSTORE/MODSEQ, the delta is computed from +// UIDs and flags rather than a MODSEQ token: the client sends the UIDs it +// already has, and the server returns only what changed — +// - Added: full envelopes for UIDs newer than the client's highest known UID. +// - Flags: current flags for known UIDs still present (client diffs locally). +// - Removed: known UIDs that have since been expunged/moved away. +// +// UIDVALIDITY is the cache-coherence guard: if it differs from the client's +// stored value the cached UIDs are meaningless, so Resync is set and the client +// must discard its cache and refetch the folder from scratch. +type MailboxDelta struct { + UIDValidity uint32 `json:"uidvalidity"` + Total uint32 `json:"total"` + Resync bool `json:"resync"` + Added []Envelope `json:"added"` + Flags []FlagUpdate `json:"flags"` + Removed []uint32 `json:"removed"` +} + // AttachmentMeta describes an attachment without including its bytes. type AttachmentMeta struct { ID string `json:"id"` diff --git a/server/internal/session/store.go b/server/internal/session/store.go index e83b409..c795e4b 100644 --- a/server/internal/session/store.go +++ b/server/internal/session/store.go @@ -158,16 +158,17 @@ func (s *Store) Get(id string) (*Session, bool) { } // IMAPFor returns the live IMAP client for the session, transparently -// reconnecting from sealed credentials if the previous connection was dropped. +// reconnecting from sealed credentials if there is no connection yet. +// +// We deliberately do NOT probe the existing connection with a NOOP here: every +// Client operation already calls ensureLive, which NOOPs and self-heals from the +// stored credentials on failure. Probing here too would add a full extra +// round-trip to the IMAP server (~one RTT) on every request for no benefit. func (s *Store) IMAPFor(ctx context.Context, sess *Session) (*imap.Client, error) { sess.mu.Lock() defer sess.mu.Unlock() if sess.imap != nil { - if err := sess.imap.Ping(ctx); err == nil { - return sess.imap, nil - } - _ = sess.imap.Close() - sess.imap = nil + return sess.imap, nil } creds, err := s.sealer.Open(sess.sealed) if err != nil { diff --git a/src/nonview/api/endpoints.ts b/src/nonview/api/endpoints.ts index d5b6fe6..3f1adc1 100644 --- a/src/nonview/api/endpoints.ts +++ b/src/nonview/api/endpoints.ts @@ -6,6 +6,7 @@ import type { APIClient } from "./client"; import type { LoginRequest, LoginResponse, + MailboxDelta, MailboxListResponse, Message, MessageListResponse, @@ -42,6 +43,25 @@ export const mailboxes = { }`; return client.get(path, signal); }, + // Incremental sync (proposal §6): given the client's cached UIDVALIDITY and + // the UIDs it already holds, return only what changed. `known` is sent as a + // comma-separated UID list; the server derives the "since" watermark from it. + changes( + client: APIClient, + mailbox: string, + opts: { uidvalidity?: number; known?: number[]; limit?: number } = {}, + signal?: AbortSignal, + ) { + const params = new URLSearchParams(); + if (opts.uidvalidity) params.set("uidvalidity", String(opts.uidvalidity)); + if (opts.limit) params.set("limit", String(opts.limit)); + if (opts.known && opts.known.length) params.set("known", opts.known.join(",")); + const qs = params.toString(); + const path = `${v1}/mailboxes/${encodeURIComponent(mailbox)}/changes${ + qs ? `?${qs}` : "" + }`; + return client.get(path, signal); + }, }; export const messages = { diff --git a/src/nonview/api/types.ts b/src/nonview/api/types.ts index 90fb1b1..0c9d149 100644 --- a/src/nonview/api/types.ts +++ b/src/nonview/api/types.ts @@ -79,6 +79,29 @@ export interface MessageListResponse { next_before?: number; // Total messages in the mailbox (independent of the page), for "1–50 of N". total?: number; + // The mailbox's UIDVALIDITY, persisted so a later delta sync can detect a + // cache-invalidating change upstream. + uidvalidity?: number; +} + +// FlagUpdate is the current flag set for one known message (see MailboxDelta). +export interface FlagUpdate { + uid: number; + flags: string[]; +} + +// MailboxDelta is the incremental-sync payload from GET .../changes. Mirrors +// server/internal/mail/types.go MailboxDelta. `added` are full envelopes for +// genuinely-new messages; `flags` are current flags for known messages still +// present (the client diffs them); `removed` are known UIDs now gone. When +// `resync` is true the client must discard its cache and refetch the folder. +export interface MailboxDelta { + uidvalidity: number; + total: number; + resync: boolean; + added: Envelope[] | null; + flags: FlagUpdate[] | null; + removed: number[] | null; } export interface APIErrorBody { diff --git a/src/nonview/cache/db.ts b/src/nonview/cache/db.ts index c7617a9ba5bf5db3a8c3f07d70cc2bcc7d35c9d3..d19e9de40a25806c534288b0118666fc442a3cd1 100644 GIT binary patch delta 1126 zcmZuwPiqrF6sOYGl0Zybm8yrnwh9S0+t`ChDwIM!1T7YA3xWq{ciuLGo1JB5Hfd-} zj(&hS_yIh4@lYuA=Eb8I1wVj-H^H-b^UZcsQ|MljdHd$~-tW);sh-?F!m<9+5^SQX zg)-u(K=!-%_8{p{sGLlnTUM3hk72A>{dN^l?vWrk>JAgUV+eHS_sXJ zwao%S1}tb{Es!l#7zYTuDO0=wBFwrpstyfPn8>~|8W?a9hOX5BL8iEvwPhgO!EHC! zsY{_-Eka;1&l>Wj+o?9-BjJT;XfB6P$u{UfF*QKMnX zxZ4VK3p#?ig)lMa@x+P+RDr7|QIZ3yHbZNf;@EUs! z!c{bL1!x`%0+|xKyChzO)Q}e%_UY)%ES-nMY(Sj0eN-#1oN#9B*4Ud{ka^<<%r#(( z`4K)GOg#QNRvzhto6}ha#<_;%LQ6xba$?SB57u!{AbF*fKn--rM53`7mT4*Cu9tkt z-D_HeG?1!GPC^<7Ef(*R)GSa^llhOyeKJ)G8H1m;6EA~tdV6`fvRw7FktA}(N#q{4 zhNIMgc~Wd~pvv*L@z2F_jh;-^Jdk#z?AfEsrAGzl;4|loi`;2STts%(o;25+TQA&} zAY-K8EYy{{3tsFh*?bN?H1`*HH-C8*nz^1{SmOZ&Y% z(|7(=tIivsFql-u?t57Eh75$&Rj9j;Lt2g!m?h$@`}9rWFk_KNGb1w0K>FD?LPLV5ghsyR~RzBd4}@220{zthG4auu%r{s)FBeaZj; delta 34 scmV+-0Nww#Huov8Wd)Ne2X?c@2L%DMq6xhVvy2!B0h6H_m6LfIz3U Promise; prevPage: (role: string) => Promise; refresh: () => Promise; + // Delta-sync just one folder by role ("inbox" | "sent" | "drafts" | "trash"). + refreshFolder: (role: string) => Promise; getThread: (id: string) => Thread | undefined; getMessages: (threadId: string) => Promise; // Cache-first body read; returns null if nothing is cached for the thread. @@ -304,6 +308,8 @@ export const DataProvider: React.FC = ({ children }) => { const fresh = msgs.map((e) => envelopeToThread(e, mailbox || "")); set(fresh); void writeThreads(account, role, fresh); + // Persist UIDVALIDITY so a later refresh can sync via a delta. + if (val.uidvalidity) void writeUidValidity(account, role, val.uidvalidity); // Reset to page 0; cursor for page 1 is this page's next_before. pageCursors.current[role] = [undefined, val.next_before]; setPage((p) => ({ ...p, [role]: 0 })); @@ -353,7 +359,7 @@ export const DataProvider: React.FC = ({ children }) => { return () => ctrl.abort(); }, [isAuthenticated, loadAll, hydrateFromCache]); - const refresh = useCallback(() => loadAll(), [loadAll]); + // `refresh` is defined below, after the pagination helpers it depends on. // Maps a role to its current thread array's state setter. const setterForRole = useCallback( @@ -401,7 +407,10 @@ export const DataProvider: React.FC = ({ children }) => { cursors[target + 1] = resp.next_before; pageCursors.current[role] = cursors; // Only page 0 is mirrored to the offline cache (the "newest" view). - if (target === 0) void writeThreads(account, role, fresh); + if (target === 0) { + void writeThreads(account, role, fresh); + if (resp.uidvalidity) void writeUidValidity(account, role, resp.uidvalidity); + } } catch (err) { if ((err as { name?: string })?.name !== "AbortError") { setError((err as Error).message || "Failed to load page"); @@ -438,6 +447,128 @@ export const DataProvider: React.FC = ({ children }) => { [page, pageLoading, fetchPage], ); + // Incremental sync for one folder (proposal §6). Diffs the cached page-0 view + // against the server and applies only added/removed/flag-changed messages, + // instead of re-listing the whole page. Falls back to a full page-0 fetch when + // there's no baseline to diff against, or when the server signals a resync + // (UIDVALIDITY changed → cached UIDs are stale). + const syncRole = useCallback( + async (role: string): Promise => { + const mailbox = rolesRef.current[role]; + const set = setterForRole(role); + if (!mailbox || !set) return; + + const cached = await readThreads(account, role); + const known = cached.map((t) => t.uid).filter((u) => u > 0); + const uidvalidity = await readUidValidity(account, role); + // Cold folder or unknown UIDVALIDITY → nothing to diff; do a full fetch. + if (!uidvalidity || known.length === 0) { + await fetchPage(role, 0, undefined); + return; + } + + const delta = await mailboxesAPI.changes(apiClient, mailbox, { + uidvalidity, + known, + limit: PAGE_SIZE, + }); + if (delta.resync) { + await fetchPage(role, 0, undefined); + return; + } + + // Reconcile against the cached baseline, keyed on UID and applied + // idempotently so repeated syncs converge to the same result. + let next = cached.slice(); + + if (delta.removed?.length) { + const gone = new Set(delta.removed); + next = next.filter((t) => !gone.has(t.uid)); + } + if (delta.flags?.length) { + const flagsByUid = new Map(delta.flags.map((f) => [f.uid, f.flags])); + next = next.map((t) => { + const fl = flagsByUid.get(t.uid); + if (!fl) return t; + const unread = fl.includes("\\Seen") ? 0 : 1; + return unread === t.unreadCount ? t : { ...t, unreadCount: unread }; + }); + } + if (delta.added?.length) { + const have = new Set(next.map((t) => t.uid)); + const fresh = delta.added + .filter((e) => !have.has(e.uid)) + .map((e) => envelopeToThread(e, mailbox)); + next = [...fresh, ...next]; + } + + // Keep the newest-first page-0 window. + next.sort( + (a, b) => + new Date(b.lastMessageTime).getTime() - + new Date(a.lastMessageTime).getTime(), + ); + if (next.length > PAGE_SIZE) next = next.slice(0, PAGE_SIZE); + + set(next); + void writeThreads(account, role, next); + void writeUidValidity(account, role, delta.uidvalidity); + setPage((p) => ({ ...p, [role]: 0 })); + setTotal((t) => ({ ...t, [role]: delta.total })); + // Cursor for page 1 is the oldest UID currently shown. + const oldestUid = next.reduce( + (min, t) => (t.uid > 0 && t.uid < min ? t.uid : min), + Number.MAX_SAFE_INTEGER, + ); + pageCursors.current[role] = [ + undefined, + oldestUid === Number.MAX_SAFE_INTEGER ? undefined : oldestUid, + ]; + }, + [apiClient, account, setterForRole, fetchPage], + ); + + // Delta-sync a single folder — the one the user is looking at. Scoping the + // refresh to one folder avoids contending four mailbox syncs on the session's + // single IMAP connection (they would otherwise serialise on its mutex, each + // paying a full SELECT). Falls back to a full load if roles aren't resolved + // yet (refresh raced ahead of the initial load). + const refreshFolder = useCallback( + async (role: string): Promise => { + if (!rolesRef.current[role]) { + await loadAll(); + return; + } + setError(null); + try { + await syncRole(role); + } catch (err) { + if ((err as { name?: string })?.name !== "AbortError") { + setError((err as Error).message || `Could not sync ${role}`); + } + } + }, + [loadAll, syncRole], + ); + + // Refresh every folder (e.g. a global "sync all"). Pages use refreshFolder for + // their own folder; this is kept for callers that genuinely want all of them. + const refresh = useCallback(async (): Promise => { + if (Object.keys(rolesRef.current).length === 0) { + await loadAll(); + return; + } + setError(null); + const roles = ["inbox", "sent", "drafts", "trash"]; + const results = await Promise.allSettled(roles.map((role) => syncRole(role))); + const failed = results + .map((s, i) => ({ s, role: roles[i] })) + .filter(({ s }) => s.status === "rejected"); + if (failed.length > 0) { + setError(`Could not sync: ${failed.map(({ role }) => role).join(", ")}`); + } + }, [loadAll, syncRole]); + const allThreads = useMemo( () => [...threads, ...sentThreads, ...drafts, ...trashedThreads], [threads, sentThreads, drafts, trashedThreads], @@ -640,6 +771,7 @@ export const DataProvider: React.FC = ({ children }) => { nextPage, prevPage, refresh, + refreshFolder, getThread, getMessages, getCachedMessages, diff --git a/src/view/moles/ThreadList.tsx b/src/view/moles/ThreadList.tsx index 31482a0..82367d2 100644 --- a/src/view/moles/ThreadList.tsx +++ b/src/view/moles/ThreadList.tsx @@ -1,7 +1,8 @@ -import React, { useEffect, useRef } from "react"; +import React, { useEffect, useRef, useState } from "react"; import { Box, CircularProgress, IconButton, Typography } from "@mui/material"; import ChevronLeftIcon from "@mui/icons-material/ChevronLeft"; import ChevronRightIcon from "@mui/icons-material/ChevronRight"; +import RefreshIcon from "@mui/icons-material/Refresh"; import { useNavigate } from "react-router-dom"; import { useVirtualizer } from "@tanstack/react-virtual"; import ThreadListItem from "./ThreadListItem"; @@ -25,9 +26,23 @@ const ThreadList = ({ onNext = undefined, onPrev = undefined, pageLoading = false, + // Optional delta-sync trigger. When provided, a refresh button is shown in the + // pager bar; it fetches only what changed since the last sync (proposal §6). + onRefresh = undefined, }) => { const navigate = useNavigate(); const parentRef = useRef(null); + const [refreshing, setRefreshing] = useState(false); + + const handleRefresh = async () => { + if (!onRefresh || refreshing) return; + setRefreshing(true); + try { + await onRefresh(); + } finally { + setRefreshing(false); + } + }; // Jump back to the top of the list whenever the page changes, so a new page // starts at its first message rather than wherever the previous one scrolled. @@ -74,6 +89,8 @@ const ThreadList = ({ // Pager math. start/end are 1-based and reflect the rows actually shown. const showPager = !!(onNext || onPrev) && total > 0; + // The top bar appears for the pager and/or the refresh button. + const showBar = showPager || !!onRefresh; const start = page * pageSize + 1; const end = page * pageSize + threads.length; const canPrev = page > 0 && !pageLoading; @@ -88,7 +105,7 @@ const ThreadList = ({ backgroundColor: "background.paper", }} > - {showPager && ( + {showBar && ( - {pageLoading && } - - {start.toLocaleString()}–{end.toLocaleString()} of{" "} - {total.toLocaleString()} - - onPrev && onPrev()} - > - - - onNext && onNext()} - > - - + {onRefresh && ( + + {refreshing ? : } + + )} + {showPager && ( + <> + {pageLoading && } + + {start.toLocaleString()}–{end.toLocaleString()} of{" "} + {total.toLocaleString()} + + onPrev && onPrev()} + > + + + onNext && onNext()} + > + + + + )} )} diff --git a/src/view/pages/InboxPage.tsx b/src/view/pages/InboxPage.tsx index 7ed3dc1..f385769 100644 --- a/src/view/pages/InboxPage.tsx +++ b/src/view/pages/InboxPage.tsx @@ -8,7 +8,7 @@ import FloatingActionButton from "../atoms/FloatingActionButton"; import { useData } from "../../nonview/core/DataContext"; function InboxPage() { - const { threads, loading, page, total, pageSize, pageLoading, nextPage, prevPage } = + const { threads, loading, page, total, pageSize, pageLoading, nextPage, prevPage, refreshFolder } = useData(); const navigate = useNavigate(); const [searchQuery, setSearchQuery] = useState(""); @@ -42,6 +42,7 @@ function InboxPage() { pageLoading={pageLoading.inbox} onNext={searchQuery ? undefined : () => nextPage("inbox")} onPrev={searchQuery ? undefined : () => prevPage("inbox")} + onRefresh={searchQuery ? undefined : () => refreshFolder("inbox")} /> nextPage("sent")} onPrev={searchQuery ? undefined : () => prevPage("sent")} + onRefresh={searchQuery ? undefined : () => refreshFolder("sent")} /> ); diff --git a/src/view/pages/TrashPage.tsx b/src/view/pages/TrashPage.tsx index 35f3541..bc85621 100644 --- a/src/view/pages/TrashPage.tsx +++ b/src/view/pages/TrashPage.tsx @@ -14,6 +14,7 @@ function TrashPage() { pageLoading, nextPage, prevPage, + refreshFolder, } = useData(); const [searchQuery, setSearchQuery] = useState(""); @@ -46,6 +47,7 @@ function TrashPage() { pageLoading={pageLoading.trash} onNext={searchQuery ? undefined : () => nextPage("trash")} onPrev={searchQuery ? undefined : () => prevPage("trash")} + onRefresh={searchQuery ? undefined : () => refreshFolder("trash")} /> ); From 1ec56ea435b85042b1071f86faef97574bd1a6da Mon Sep 17 00:00:00 2001 From: Aravinda-HWK Date: Mon, 22 Jun 2026 11:19:31 +0530 Subject: [PATCH 2/2] =?UTF-8?q?feat:=20realtime=20push=20(IMAP=20IDLE=20?= =?UTF-8?q?=E2=86=92=20SSE)=20+=20delta-sync=20latency=20fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4 of the proposal: new mail appears without a manual refresh. Realtime pipeline: - imap.Client.Watch: run IMAP IDLE on a dedicated connection, forwarding mailbox/message/expunge updates via a callback (go-imap handles the capability check, periodic restart, and polling fallback). - realtime.Hub: per-session fan-out to SSE subscribers with ref-counted IDLE watchers (N tabs on one folder share one connection) and capped backoff reconnect. - GET /api/v1/events SSE handler: clears the write deadline so the stream isn't killed by WriteTimeout; 25s heartbeat that also keeps the session warm. - RequireSession also accepts the JWT as an access_token query param, since the browser EventSource API cannot set an Authorization header. - Client opens one EventSource watching the inbox and runs a delta sync on each push; a green "Live" indicator shows the stream is connected. Latency / correctness fixes surfaced during end-to-end testing on Gmail: - selectMailbox now tracks read-only vs read-write mode and re-SELECTs when a write follows a read-only SELECT, fixing a "STORE on READ-ONLY folder" 502 storm on mark-as-read. - Keep the session's IMAP connection warm (keepalive NOOP on each sweep) and skip the per-request liveness NOOP within a freshness window, moving the ~1.5s reconnect cost off the user-facing sync path. - Delta sync fetches the newest messages by sequence number in one round trip instead of SEARCH-then-FETCH, removing a round trip. Measured warm-path delta sync on Gmail dropped from ~3.5s to ~1.5s. --- server/cmd/server/main.go | 3 + server/internal/http/handlers/events.go | 131 ++++++++++++++ server/internal/http/middleware/auth.go | 10 +- server/internal/http/router.go | 7 + server/internal/imap/client.go | 182 ++++++++++++++++--- server/internal/realtime/hub.go | 225 ++++++++++++++++++++++++ server/internal/realtime/hub_test.go | 122 +++++++++++++ server/internal/session/store.go | 44 +++++ src/nonview/api/client.ts | 11 ++ src/nonview/core/DataContext.tsx | 67 +++++++ src/view/moles/ThreadList.tsx | 52 ++++-- src/view/pages/InboxPage.tsx | 3 +- 12 files changed, 817 insertions(+), 40 deletions(-) create mode 100644 server/internal/http/handlers/events.go create mode 100644 server/internal/realtime/hub.go create mode 100644 server/internal/realtime/hub_test.go diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index 359af97..44e2b49 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -16,6 +16,7 @@ import ( "quicksilver/server/internal/config" apihttp "quicksilver/server/internal/http" applog "quicksilver/server/internal/log" + "quicksilver/server/internal/realtime" "quicksilver/server/internal/session" "quicksilver/server/internal/smtp" ) @@ -60,6 +61,7 @@ func main() { } sessions := session.NewStore(bgCtx, sealer, cfg.SessionIdleTTL, cfg.SessionSweepInt, cfg.IMAPTimeout, logger) sender := smtp.New(cfg.SMTPTimeout) + hub := realtime.NewHub(logger) router := apihttp.NewRouter(apihttp.Deps{ Config: cfg, @@ -70,6 +72,7 @@ func main() { Issuer: issuer, Sealer: sealer, Sender: sender, + Hub: hub, RateLimitCtx: bgCtx, }) diff --git a/server/internal/http/handlers/events.go b/server/internal/http/handlers/events.go new file mode 100644 index 0000000..5c60576 --- /dev/null +++ b/server/internal/http/handlers/events.go @@ -0,0 +1,131 @@ +package handlers + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "strings" + "time" + + "quicksilver/server/internal/http/middleware" + "quicksilver/server/internal/httpx" + "quicksilver/server/internal/imap" + "quicksilver/server/internal/realtime" + "quicksilver/server/internal/session" +) + +// Events serves the realtime change-notification stream over Server-Sent Events +// (proposal §6, Phase 4). The browser opens one EventSource; the gateway holds +// an IMAP IDLE connection per watched mailbox and pushes a tiny "changed" event +// whenever that mailbox changes, prompting the client to run a delta sync. +type Events struct { + Sessions *session.Store + Hub *realtime.Hub + Logger *slog.Logger + Heartbeat time.Duration // comment-ping interval to keep the stream alive +} + +// Stream is GET /api/v1/events. Auth is via the standard bearer token, which +// RequireSession also accepts as an access_token query param because the +// browser EventSource API cannot set request headers. +// +// Query params: +// - mailbox: comma-separated mailbox names to watch (default INBOX) +func (h *Events) Stream(w http.ResponseWriter, r *http.Request) { + sess, ok := middleware.SessionFrom(r.Context()) + if !ok { + httpx.WriteError(w, r, h.Logger, httpx.NewAPIError(http.StatusUnauthorized, httpx.CodeUnauthorized, "no session", nil)) + return + } + + mailboxes := parseMailboxes(r.URL.Query().Get("mailbox")) + if len(mailboxes) == 0 { + mailboxes = []string{"INBOX"} + } + + rc := http.NewResponseController(w) + // SSE streams stay open far longer than the server's WriteTimeout; clear the + // per-connection write deadline so it isn't killed mid-stream. + if err := rc.SetWriteDeadline(time.Time{}); err != nil { + h.Logger.Warn("events: clear write deadline", "err", err) + } + + h2 := w.Header() + h2.Set("Content-Type", "text/event-stream") + h2.Set("Cache-Control", "no-cache") + h2.Set("Connection", "keep-alive") + h2.Set("X-Accel-Buffering", "no") // disable proxy buffering (nginx) + w.WriteHeader(http.StatusOK) + + dial := func(ctx context.Context) (*imap.Client, error) { + return h.Sessions.DialIMAP(ctx, sess) + } + sub, cleanup := h.Hub.Subscribe(sess.ID, dial, mailboxes) + defer cleanup() + + // Open the stream with a comment so the browser fires `onopen` promptly. + if _, err := fmt.Fprint(w, ": connected\n\n"); err != nil { + return + } + _ = rc.Flush() + + heartbeat := h.Heartbeat + if heartbeat <= 0 { + heartbeat = 25 * time.Second + } + hb := time.NewTicker(heartbeat) + defer hb.Stop() + + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case ev := <-sub.C: + b, err := json.Marshal(ev) + if err != nil { + continue + } + if _, err := fmt.Fprintf(w, "event: changed\ndata: %s\n\n", b); err != nil { + return + } + if err := rc.Flush(); err != nil { + return + } + case <-hb.C: + // Keep the session warm while the user is actively connected, and + // keep idle proxies from closing the stream. + sess.Touch() + if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil { + return + } + if err := rc.Flush(); err != nil { + return + } + } + } +} + +// parseMailboxes splits a comma-separated mailbox list, trimming blanks and +// capping the count so one connection can't spin up an unbounded number of IDLE +// connections. +func parseMailboxes(s string) []string { + if s == "" { + return nil + } + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p == "" { + continue + } + out = append(out, p) + if len(out) >= 8 { + break + } + } + return out +} diff --git a/server/internal/http/middleware/auth.go b/server/internal/http/middleware/auth.go index 4ce76fc..b63c779 100644 --- a/server/internal/http/middleware/auth.go +++ b/server/internal/http/middleware/auth.go @@ -49,8 +49,12 @@ func RequireSession(issuer *auth.Issuer, store *session.Store) func(http.Handler func bearerToken(r *http.Request) string { h := r.Header.Get("Authorization") const prefix = "Bearer " - if !strings.HasPrefix(h, prefix) { - return "" + if strings.HasPrefix(h, prefix) { + return strings.TrimSpace(h[len(prefix):]) } - return strings.TrimSpace(h[len(prefix):]) + // Fall back to the access_token query param. The browser EventSource API + // (used by the SSE /events stream) cannot set an Authorization header, so it + // passes the token in the query string. Safe here because the access log + // records only r.URL.Path, never the raw query. + return strings.TrimSpace(r.URL.Query().Get("access_token")) } diff --git a/server/internal/http/router.go b/server/internal/http/router.go index 50eaee5..46c1b89 100644 --- a/server/internal/http/router.go +++ b/server/internal/http/router.go @@ -11,6 +11,7 @@ import ( "quicksilver/server/internal/config" "quicksilver/server/internal/http/handlers" "quicksilver/server/internal/http/middleware" + "quicksilver/server/internal/realtime" "quicksilver/server/internal/session" "quicksilver/server/internal/smtp" ) @@ -25,6 +26,7 @@ type Deps struct { Issuer *auth.Issuer Sealer *session.Sealer Sender *smtp.Sender + Hub *realtime.Hub // RateLimitCtx scopes the rate limiter's background reaper. When the // context is cancelled, the limiter stops reaping idle IP entries. RateLimitCtx context.Context @@ -47,6 +49,7 @@ func NewRouter(d Deps) http.Handler { authH := &handlers.Auth{Sessions: d.Sessions, Issuer: d.Issuer, Logger: d.Logger} mboxH := &handlers.Mailboxes{Sessions: d.Sessions, Logger: d.Logger} msgH := &handlers.Messages{Sessions: d.Sessions, Sealer: d.Sealer, Sender: d.Sender, Logger: d.Logger} + eventsH := &handlers.Events{Sessions: d.Sessions, Hub: d.Hub, Logger: d.Logger} requireSession := middleware.RequireSession(d.Issuer, d.Sessions) @@ -68,6 +71,10 @@ func NewRouter(d Deps) http.Handler { r.Patch("/mailboxes/{mailbox}/messages/{uid}/flags", msgH.SetFlags) r.Delete("/mailboxes/{mailbox}/messages/{uid}", msgH.Delete) r.Post("/messages", msgH.Send) + + // Realtime change stream (SSE). RequireSession also accepts the JWT + // as an access_token query param since EventSource can't set headers. + r.Get("/events", eventsH.Stream) }) }) diff --git a/server/internal/imap/client.go b/server/internal/imap/client.go index 9dbcb4a..c856723 100644 --- a/server/internal/imap/client.go +++ b/server/internal/imap/client.go @@ -34,7 +34,12 @@ type Client struct { timeout time.Duration logger *slog.Logger conn *client.Client - selected string // currently SELECTed mailbox (case-sensitive on the wire) + lastOK time.Time // when the connection was last known good (see ensureLive) + selected string // currently SELECTed mailbox (case-sensitive on the wire) + // selectedRO records whether the current SELECT is read-only. A write + // operation (STORE/MOVE) after a read-only SELECT of the same mailbox must + // re-SELECT read-write, or the server rejects it ("STORE on READ-ONLY"). + selectedRO bool } // New dials the IMAP server and authenticates. The returned client owns the @@ -79,16 +84,53 @@ func (c *Client) connect(ctx context.Context) error { // ensureLive returns the current connection, reconnecting on noop failure. // // Caller must hold c.mu. +// connFreshFor is how long after a known-good use we trust the connection +// without re-probing it. A NOOP is a full round-trip to the server; skipping it +// on back-to-back operations (e.g. a realtime delta sync) noticeably cuts +// latency against high-RTT providers like Gmail. This is kept longer than the +// session keepalive interval (which NOOPs every live connection on each sweep) +// so a warm connection's NOOP/reconnect cost is paid by the background sweep, +// never on the user-facing sync path. A connection that dies inside the window +// surfaces as an error on the next command, which then heals. +const connFreshFor = 90 * time.Second + +// Keepalive issues a NOOP to keep the connection warm and refresh its +// liveness timestamp. Unlike ensureLive it does not reconnect on failure — it +// just drops the dead connection so the next real operation re-establishes it. +// Intended to be called periodically by the session sweeper. +func (c *Client) Keepalive(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + if c.conn == nil { + return nil // nothing to keep warm; created lazily on next use + } + if err := c.conn.Noop(); err != nil { + _ = c.conn.Logout() + c.conn = nil + return err + } + c.lastOK = time.Now() + return nil +} + func (c *Client) ensureLive(ctx context.Context) error { if c.conn != nil { + if time.Since(c.lastOK) < connFreshFor { + return nil + } if err := c.conn.Noop(); err == nil { + c.lastOK = time.Now() return nil } // Connection looks dead — close and fall through to reconnect. _ = c.conn.Logout() c.conn = nil } - return c.connect(ctx) + if err := c.connect(ctx); err != nil { + return err + } + c.lastOK = time.Now() + return nil } // Close logs out and closes the underlying connection. Safe to call once. @@ -161,7 +203,10 @@ func mailboxFromInfo(info *imap.MailboxInfo) hmail.Mailbox { } func (c *Client) selectMailbox(name string, readOnly bool) error { - if c.selected == name { + // Reuse the existing SELECT only if it also satisfies the required access + // mode. A read-write SELECT can serve a read-only request, but a read-only + // SELECT cannot serve a read-write one (the server rejects STORE/MOVE). + if c.selected == name && (readOnly || !c.selectedRO) { return nil } _, err := c.conn.Select(name, readOnly) @@ -170,6 +215,7 @@ func (c *Client) selectMailbox(name string, readOnly bool) error { return fmt.Errorf("select %q: %w", name, err) } c.selected = name + c.selectedRO = readOnly return nil } @@ -192,7 +238,7 @@ func (c *Client) ListMessages(ctx context.Context, mailbox string, limit int, be if err != nil { return nil, 0, 0, fmt.Errorf("select %q: %w", mailbox, err) } - c.selected = mailbox + c.selected, c.selectedRO = mailbox, true total, uidvalidity = mbox.Messages, mbox.UidValidity if total == 0 { return []hmail.Envelope{}, 0, uidvalidity, nil @@ -245,7 +291,7 @@ func (c *Client) MailboxChanges(ctx context.Context, mailbox string, knownValidi if err != nil { return delta, fmt.Errorf("select %q: %w", mailbox, err) } - c.selected = mailbox + c.selected, c.selectedRO = mailbox, true delta.UIDValidity, delta.Total = mbox.UidValidity, mbox.Messages // UIDVALIDITY changed → the client's cached UIDs no longer identify the same @@ -264,27 +310,26 @@ func (c *Client) MailboxChanges(ctx context.Context, mailbox string, knownValidi } } - // 1. Added — UIDs in (sinceUID+1):*, capped to the newest `limit`. - if mbox.Messages > 0 && sinceUID < ^uint32(0) { - crit := imap.NewSearchCriteria() - seq := new(imap.SeqSet) - seq.AddRange(sinceUID+1, 0) // "(sinceUID+1):*" — 0 means "*" - crit.Uid = seq - newUIDs, err := c.conn.UidSearch(crit) - if err != nil { - return delta, fmt.Errorf("uid search added: %w", err) + // 1. Added — fetch the newest `limit` messages by sequence number in a single + // round trip (no preceding SEARCH), then keep only those strictly newer than + // the client's watermark. Bounded by `limit` regardless of backlog size. + if mbox.Messages > 0 { + low := uint32(1) + if mbox.Messages > uint32(limit) { + low = mbox.Messages - uint32(limit) + 1 } - if len(newUIDs) > limit { - newUIDs = newUIDs[len(newUIDs)-limit:] + recent, err := c.fetchEnvelopesSeq(low, mbox.Messages) + if err != nil { + return delta, err } - if len(newUIDs) > 0 { - added, err := c.fetchEnvelopes(newUIDs) - if err != nil { - return delta, err + added := make([]hmail.Envelope, 0, len(recent)) + for _, e := range recent { + if e.UID > sinceUID { + added = append(added, e) } - reverseEnvelopes(added) // newest-first, matching ListMessages - delta.Added = added } + reverseEnvelopes(added) // seq-ascending (oldest-first) → newest-first + delta.Added = added } // 2. Flags + removals among the known set. A known UID absent from the @@ -325,6 +370,28 @@ func (c *Client) fetchEnvelopes(uids []uint32) ([]hmail.Envelope, error) { return out, nil } +// fetchEnvelopesSeq fetches list-view envelopes for the inclusive +// sequence-number range [low, high] in ascending order. Used by the delta sync +// to grab the newest messages in one round trip without a preceding SEARCH. +// Caller must hold c.mu and have the mailbox SELECTed. +func (c *Client) fetchEnvelopesSeq(low, high uint32) ([]hmail.Envelope, error) { + seq := new(imap.SeqSet) + seq.AddRange(low, high) + msgs := make(chan *imap.Message, high-low+1) + done := make(chan error, 1) + items := []imap.FetchItem{imap.FetchEnvelope, imap.FetchFlags, imap.FetchUid, imap.FetchBodyStructure} + go func() { done <- c.conn.Fetch(seq, items, msgs) }() + + var out []hmail.Envelope + for m := range msgs { + out = append(out, envelopeFrom(m)) + } + if err := <-done; err != nil { + return nil, fmt.Errorf("fetch envelopes seq: %w", err) + } + return out, nil +} + // fetchFlags fetches only the flags for the given UIDs, returned as a uid→flags // map. UIDs that no longer exist are simply omitted from the result. Caller must // hold c.mu and have the mailbox SELECTed. @@ -444,6 +511,77 @@ func (c *Client) GetMessage(ctx context.Context, mailbox string, uid uint32) (*h // ErrNotFound is returned when an operation cannot locate the requested item. var ErrNotFound = errors.New("not found") +// Watch SELECTs mailbox (read-only) and blocks in IMAP IDLE, invoking onChange +// whenever the server reports activity in that mailbox — a new message, an +// expunge, or a flag change. It returns when ctx is cancelled (ctx.Err()) or the +// connection fails; callers are expected to reconnect on a non-nil error. +// +// If the server lacks the IDLE capability, go-imap transparently falls back to +// polling, so onChange still fires (just less promptly). Watch monopolises the +// connection for its entire lifetime, so it MUST run on a dedicated Client — +// never the one serving request/response API traffic (see Store.DialIMAP). +func (c *Client) Watch(ctx context.Context, mailbox string, onChange func()) error { + c.mu.Lock() + defer c.mu.Unlock() + if err := c.ensureLive(ctx); err != nil { + return err + } + if _, err := c.conn.Select(mailbox, true); err != nil { + c.selected = "" + return fmt.Errorf("select %q: %w", mailbox, err) + } + c.selected, c.selectedRO = mailbox, true + + // Diagnostic: confirm we're using real push (IDLE) and not go-imap's polling + // fallback. Polling shows up as detections spaced exactly one PollInterval + // apart, which masquerades as "slow to detect new mail". + idleOK, capErr := c.conn.Support("IDLE") + c.logger.Info("imap watch: starting", "mailbox", mailbox, "idle_supported", idleOK, "cap_err", capErr) + + // IDLE blocks far longer than a normal command; the per-command deadline + // go-imap applies from c.Timeout would abort the wait. Disable it for the + // lifetime of this watch (the connection is dedicated and short-lived). + c.conn.Timeout = 0 + + updates := make(chan client.Update, 16) + c.conn.Updates = updates + defer func() { c.conn.Updates = nil }() + + stop := make(chan struct{}) + done := make(chan error, 1) + go func() { + // LogoutTimeout restarts IDLE before the server's inactivity cutoff; + // PollInterval is used only when the server lacks IDLE. + done <- c.conn.Idle(stop, &client.IdleOptions{ + LogoutTimeout: 25 * time.Minute, + PollInterval: 10 * time.Second, // safety net if the server lacks IDLE + }) + }() + + for { + select { + case <-ctx.Done(): + close(stop) + <-done // let Idle unwind cleanly before the caller closes the conn + return ctx.Err() + case err := <-done: + // Idle returned on its own — the connection broke or the server + // closed it. Surface so the caller reconnects. + if err != nil { + return err + } + return errors.New("imap idle ended unexpectedly") + case u := <-updates: + switch u.(type) { + case *client.MailboxUpdate, *client.MessageUpdate, *client.ExpungeUpdate: + if onChange != nil { + onChange() + } + } + } + } +} + func parseRFC822(r io.Reader) (*hmail.Message, error) { mr, err := gomail.CreateReader(r) if err != nil { diff --git a/server/internal/realtime/hub.go b/server/internal/realtime/hub.go new file mode 100644 index 0000000..8449cac --- /dev/null +++ b/server/internal/realtime/hub.go @@ -0,0 +1,225 @@ +// Package realtime implements the server-push half of the sync protocol +// (proposal §6, Phase 4). For each connected client it holds one IMAP IDLE +// connection per watched mailbox; when IDLE fires, it pushes a tiny "changed" +// event to that client's SSE subscribers, which then run an incremental sync. +// +// The event carries only the mailbox name — no message data — keeping the push +// lightweight. The actual delta is pulled over the regular REST API afterwards. +// +// Concurrency: a Hub owns a sessionHub per session id. Watchers are reference +// counted by mailbox so N browser tabs watching the same folder share one IDLE +// connection, and the connection is torn down when the last subscriber leaves. +// Lock order is always Hub.mu → sessionHub.mu; no path acquires them the other +// way round. +package realtime + +import ( + "context" + "log/slog" + "sync" + "time" + + "quicksilver/server/internal/imap" +) + +// ChangeEvent is the payload pushed to the client when a watched mailbox +// changes. It deliberately carries no message data. +type ChangeEvent struct { + Mailbox string `json:"mailbox"` +} + +// Dialer opens a fresh, dedicated IMAP connection for watching. The watcher +// owns the returned client and closes it when the watch ends. +type Dialer func(ctx context.Context) (*imap.Client, error) + +// Hub fans IMAP IDLE activity out to SSE subscribers, keyed by session. +type Hub struct { + logger *slog.Logger + + mu sync.Mutex + sessions map[string]*sessionHub +} + +// NewHub constructs an empty Hub. +func NewHub(logger *slog.Logger) *Hub { + return &Hub{logger: logger, sessions: make(map[string]*sessionHub)} +} + +// Subscriber is one SSE connection. The handler ranges over C and writes each +// event to the wire; events are dropped (never block) if the consumer lags. +type Subscriber struct { + C chan ChangeEvent + mailboxes map[string]bool +} + +// Subscribe registers an SSE consumer for the given session and mailboxes, +// starting (or reusing) an IDLE watcher per mailbox. The returned cleanup func +// must be called exactly once when the SSE connection ends; it decrements the +// watchers and tears down any that reach zero subscribers. +func (h *Hub) Subscribe(sessionID string, dial Dialer, mailboxes []string) (*Subscriber, func()) { + h.mu.Lock() + sh := h.sessions[sessionID] + if sh == nil { + sh = &sessionHub{ + hub: h, + id: sessionID, + dial: dial, + subs: make(map[*Subscriber]struct{}), + watchers: make(map[string]*watcher), + } + h.sessions[sessionID] = sh + } + h.mu.Unlock() + return sh.add(mailboxes) +} + +func (h *Hub) removeIfEmpty(id string) { + h.mu.Lock() + defer h.mu.Unlock() + sh, ok := h.sessions[id] + if !ok { + return + } + sh.mu.Lock() + empty := len(sh.subs) == 0 && len(sh.watchers) == 0 + sh.mu.Unlock() + if empty { + delete(h.sessions, id) + } +} + +// sessionHub holds all subscribers and watchers for a single session. +type sessionHub struct { + hub *Hub + id string + dial Dialer + + mu sync.Mutex + subs map[*Subscriber]struct{} + watchers map[string]*watcher +} + +type watcher struct { + refs int + cancel context.CancelFunc +} + +func (sh *sessionHub) add(mailboxes []string) (*Subscriber, func()) { + sub := &Subscriber{C: make(chan ChangeEvent, 8), mailboxes: make(map[string]bool)} + + sh.mu.Lock() + sh.subs[sub] = struct{}{} + for _, m := range mailboxes { + if m == "" || sub.mailboxes[m] { + continue + } + sub.mailboxes[m] = true + w := sh.watchers[m] + if w == nil { + ctx, cancel := context.WithCancel(context.Background()) + w = &watcher{cancel: cancel} + sh.watchers[m] = w + go sh.runWatcher(ctx, m) + } + w.refs++ + } + sh.mu.Unlock() + + var once sync.Once + cleanup := func() { + once.Do(func() { + sh.mu.Lock() + delete(sh.subs, sub) + for m := range sub.mailboxes { + if w := sh.watchers[m]; w != nil { + w.refs-- + if w.refs <= 0 { + w.cancel() + delete(sh.watchers, m) + } + } + } + sh.mu.Unlock() + // We never close sub.C: broadcast only sends to subs still in the + // map (under sh.mu), and we removed this one above, so no send can + // race a close. The channel is simply GC'd with the Subscriber. + sh.hub.removeIfEmpty(sh.id) + }) + } + return sub, cleanup +} + +// runWatcher keeps a dedicated IDLE connection alive for one mailbox, calling +// broadcast on every change and reconnecting (with capped backoff) until ctx is +// cancelled — the periodic safety re-sync against connection drops noted in the +// proposal's risk table. +func (sh *sessionHub) runWatcher(ctx context.Context, mailbox string) { + const ( + minBackoff = 1 * time.Second + maxBackoff = 30 * time.Second + ) + backoff := minBackoff + for { + if ctx.Err() != nil { + return + } + c, err := sh.dial(ctx) + if err != nil { + sh.hub.logger.Warn("realtime: dial failed", "session_id", sh.id, "mailbox", mailbox, "err", err) + if !sleepCtx(ctx, backoff) { + return + } + backoff = next(backoff, maxBackoff) + continue + } + backoff = minBackoff // a successful connect resets the backoff + err = c.Watch(ctx, mailbox, func() { sh.broadcast(mailbox) }) + _ = c.Close() + if ctx.Err() != nil { + return + } + sh.hub.logger.Info("realtime: watch ended, reconnecting", "session_id", sh.id, "mailbox", mailbox, "err", err) + if !sleepCtx(ctx, backoff) { + return + } + backoff = next(backoff, maxBackoff) + } +} + +func (sh *sessionHub) broadcast(mailbox string) { + ev := ChangeEvent{Mailbox: mailbox} + sh.mu.Lock() + sh.hub.logger.Info("realtime: change detected", "session_id", sh.id, "mailbox", mailbox, "subscribers", len(sh.subs)) + for sub := range sh.subs { + if !sub.mailboxes[mailbox] { + continue + } + // Non-blocking: a lagging consumer drops this signal and catches up on + // its next sync — the events are coalescible by design. + select { + case sub.C <- ev: + default: + } + } + sh.mu.Unlock() +} + +// sleepCtx sleeps for d or until ctx is cancelled. Reports false if cancelled. +func sleepCtx(ctx context.Context, d time.Duration) bool { + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + return false + case <-t.C: + return true + } +} + +func next(d, max time.Duration) time.Duration { + d *= 2 + if d > max { + return max + } + return d +} diff --git a/server/internal/realtime/hub_test.go b/server/internal/realtime/hub_test.go new file mode 100644 index 0000000..0a2ff07 --- /dev/null +++ b/server/internal/realtime/hub_test.go @@ -0,0 +1,122 @@ +package realtime + +import ( + "context" + "io" + "log/slog" + "testing" + "time" + + "quicksilver/server/internal/imap" +) + +func testHub() *Hub { + return NewHub(slog.New(slog.NewTextHandler(io.Discard, nil))) +} + +// errDial never connects, so the watcher goroutine just loops in backoff and +// never calls broadcast — letting us drive broadcast manually. +func errDial(context.Context) (*imap.Client, error) { + return nil, context.Canceled +} + +func sessionHubFor(t *testing.T, h *Hub, id string) *sessionHub { + t.Helper() + h.mu.Lock() + defer h.mu.Unlock() + sh, ok := h.sessions[id] + if !ok { + t.Fatalf("no sessionHub for %q", id) + } + return sh +} + +func TestWatcherRefCountingAndBroadcast(t *testing.T) { + h := testHub() + + subA, cleanupA := h.Subscribe("sess-1", errDial, []string{"INBOX"}) + subB, cleanupB := h.Subscribe("sess-1", errDial, []string{"INBOX"}) + + sh := sessionHubFor(t, h, "sess-1") + + // One shared watcher for INBOX, referenced twice. + sh.mu.Lock() + w := sh.watchers["INBOX"] + refs := 0 + if w != nil { + refs = w.refs + } + nSubs := len(sh.subs) + sh.mu.Unlock() + if w == nil { + t.Fatal("expected an INBOX watcher") + } + if refs != 2 { + t.Fatalf("watcher refs = %d, want 2", refs) + } + if nSubs != 2 { + t.Fatalf("subs = %d, want 2", nSubs) + } + + // A broadcast reaches every subscriber watching the mailbox. + sh.broadcast("INBOX") + for name, sub := range map[string]*Subscriber{"A": subA, "B": subB} { + select { + case ev := <-sub.C: + if ev.Mailbox != "INBOX" { + t.Fatalf("sub %s got mailbox %q, want INBOX", name, ev.Mailbox) + } + case <-time.After(time.Second): + t.Fatalf("sub %s received no event", name) + } + } + + // Dropping one subscriber keeps the shared watcher alive. + cleanupA() + sh.mu.Lock() + w = sh.watchers["INBOX"] + if w == nil || w.refs != 1 { + sh.mu.Unlock() + t.Fatalf("after cleanupA: watcher should remain with refs=1") + } + sh.mu.Unlock() + + // Dropping the last subscriber tears the watcher down and evicts the session. + cleanupB() + sh.mu.Lock() + remaining := len(sh.watchers) + len(sh.subs) + sh.mu.Unlock() + if remaining != 0 { + t.Fatalf("after cleanupB: watchers+subs = %d, want 0", remaining) + } + + h.mu.Lock() + _, stillThere := h.sessions["sess-1"] + h.mu.Unlock() + if stillThere { + t.Fatal("session should be evicted once empty") + } +} + +func TestBroadcastSkipsUnwatchedMailbox(t *testing.T) { + h := testHub() + sub, cleanup := h.Subscribe("sess-2", errDial, []string{"INBOX"}) + defer cleanup() + + sh := sessionHubFor(t, h, "sess-2") + sh.broadcast("Sent") // sub watches INBOX only + + select { + case ev := <-sub.C: + t.Fatalf("unexpected event for unwatched mailbox: %q", ev.Mailbox) + case <-time.After(100 * time.Millisecond): + // good: nothing delivered + } +} + +func TestCleanupIsIdempotent(t *testing.T) { + h := testHub() + _, cleanup := h.Subscribe("sess-3", errDial, []string{"INBOX"}) + cleanup() + cleanup() // must not double-decrement or panic +} diff --git a/server/internal/session/store.go b/server/internal/session/store.go index c795e4b..af4b1eb 100644 --- a/server/internal/session/store.go +++ b/server/internal/session/store.go @@ -182,6 +182,25 @@ func (s *Store) IMAPFor(ctx context.Context, sess *Session) (*imap.Client, error return c, nil } +// DialIMAP opens a NEW, dedicated IMAP connection for the session from its +// sealed credentials. Unlike IMAPFor it does not cache the connection on the +// session: the caller owns the returned client and must Close it. Used for +// long-lived IDLE watchers, which monopolise a connection and so must not run +// on the session's shared request/response client. +func (s *Store) DialIMAP(ctx context.Context, sess *Session) (*imap.Client, error) { + sess.mu.Lock() + sealed := append([]byte(nil), sess.sealed...) + sess.mu.Unlock() + if len(sealed) == 0 { + return nil, errors.New("session has no credentials") + } + creds, err := s.sealer.Open(sealed) + if err != nil { + return nil, fmt.Errorf("unseal creds: %w", err) + } + return imap.New(ctx, creds, s.imapTimeout, s.logger) +} + // Delete removes the session and closes its IMAP connection. func (s *Store) Delete(id string) { s.mu.Lock() @@ -224,6 +243,31 @@ func (s *Store) sweepLoop(ctx context.Context) { return case <-t.C: s.sweep() + s.keepalive(ctx) + } + } +} + +// keepalive NOOPs every live session's IMAP connection so it stays warm between +// user actions. Without this, a connection idle longer than the provider's +// timeout (Gmail drops idle connections) goes cold, and the next delta sync pays +// a full TLS reconnect + LOGIN — the dominant latency on the realtime path. +func (s *Store) keepalive(ctx context.Context) { + s.mu.RLock() + sessions := make([]*Session, 0, len(s.sessions)) + for _, sess := range s.sessions { + sessions = append(sessions, sess) + } + s.mu.RUnlock() + for _, sess := range sessions { + sess.mu.Lock() + c := sess.imap + sess.mu.Unlock() + if c == nil { + continue + } + if err := c.Keepalive(ctx); err != nil { + s.logger.Debug("session keepalive failed", "session_id", sess.ID, "err", err) } } } diff --git a/src/nonview/api/client.ts b/src/nonview/api/client.ts index b4ea774..b964ff2 100644 --- a/src/nonview/api/client.ts +++ b/src/nonview/api/client.ts @@ -108,6 +108,17 @@ export class APIClient { postUnauthed(path: string, body?: unknown): Promise { return this.request("POST", path, body, { auth: false }); } + + // Builds an absolute URL for an SSE (EventSource) endpoint. The browser + // EventSource API cannot set an Authorization header, so the JWT rides along + // as the access_token query param, which the backend's RequireSession accepts. + sseURL(path: string, params: Record = {}): string { + const qs = new URLSearchParams(params); + const tok = this.getToken(); + if (tok) qs.set("access_token", tok); + const query = qs.toString(); + return `${this.baseURL}${path}${query ? `?${query}` : ""}`; + } } // Resolves the API base URL from Vite env, falling back to a same-origin /api diff --git a/src/nonview/core/DataContext.tsx b/src/nonview/core/DataContext.tsx index 792b731..c40fa34 100644 --- a/src/nonview/core/DataContext.tsx +++ b/src/nonview/core/DataContext.tsx @@ -87,6 +87,8 @@ interface DataContextValue { contacts: Participant[]; loading: boolean; error: string | null; + // True while the realtime SSE stream is connected (new mail pushes live). + realtimeConnected: boolean; unreadCount: number; // Page-based pagination keyed by role ("inbox" | "sent" | "drafts" | "trash"). // page[role] is the 0-based current page; total[role] is the mailbox's full @@ -228,6 +230,11 @@ export const DataProvider: React.FC = ({ children }) => { const [trashedThreads, setTrashedThreads] = useState([]); const [loading, setLoading] = useState(false); const [error, setError] = useState(null); + // Resolved INBOX name (provider-specific). Held in state so the realtime + // effect can (re)open the SSE stream once the mailbox list is known. + const [inboxName, setInboxName] = useState(""); + // Whether the realtime SSE stream is currently connected. + const [realtimeConnected, setRealtimeConnected] = useState(false); // Pagination bookkeeping per folder role. const [page, setPage] = useState>({}); @@ -274,6 +281,7 @@ export const DataProvider: React.FC = ({ children }) => { setMailboxList(resp.mailboxes || []); const roles = resolveRoles(resp.mailboxes || []); rolesRef.current = roles; + setInboxName(roles.inbox || ""); // Use allSettled so a failure in one folder (e.g. a provider with no // "Drafts" mailbox, or a transient upstream error) doesn't wipe out @@ -569,6 +577,64 @@ export const DataProvider: React.FC = ({ children }) => { } }, [loadAll, syncRole]); + // Keep a live reference to syncRole so the realtime effect can call the + // latest version without re-subscribing the SSE stream every time syncRole's + // identity changes (which would needlessly tear down the IMAP IDLE watcher). + const syncRoleRef = useRef(syncRole); + useEffect(() => { + syncRoleRef.current = syncRole; + }, [syncRole]); + + // Realtime push (proposal §6, Phase 4). Open one SSE stream that watches the + // inbox; when the gateway's IMAP IDLE connection reports a change it pushes a + // tiny "changed" event, and we run a delta sync for the affected folder. The + // event carries only the mailbox name — the actual delta is pulled over the + // regular sync path, so the push stays lightweight. + useEffect(() => { + if (!isAuthenticated || !inboxName) return; + + const url = apiClient.sseURL("/api/v1/events", { mailbox: inboxName }); + const es = new EventSource(url); + + // Coalesce bursts of IDLE updates (e.g. a flag change + arrival) into one + // sync per folder. + const timers: Record> = {}; + const scheduleSync = (role: string) => { + if (timers[role]) clearTimeout(timers[role]); + timers[role] = setTimeout(() => { + delete timers[role]; + void syncRoleRef.current(role); + }, 60); + }; + + const onOpen = () => setRealtimeConnected(true); + const onError = () => setRealtimeConnected(false); // EventSource auto-reconnects + const onChanged = (e: MessageEvent) => { + let mailbox = inboxName; + try { + mailbox = (JSON.parse(e.data) as { mailbox?: string }).mailbox || inboxName; + } catch { + // Malformed payload — fall back to syncing the inbox. + } + // Map the mailbox name back to a role; default to inbox. + const entry = Object.entries(rolesRef.current).find( + ([, name]) => name === mailbox, + ); + scheduleSync(entry ? entry[0] : "inbox"); + }; + + es.addEventListener("open", onOpen); + es.addEventListener("error", onError); + es.addEventListener("changed", onChanged as EventListener); + + return () => { + for (const t of Object.values(timers)) clearTimeout(t); + es.removeEventListener("changed", onChanged as EventListener); + es.close(); + setRealtimeConnected(false); + }; + }, [isAuthenticated, apiClient, inboxName]); + const allThreads = useMemo( () => [...threads, ...sentThreads, ...drafts, ...trashedThreads], [threads, sentThreads, drafts, trashedThreads], @@ -763,6 +829,7 @@ export const DataProvider: React.FC = ({ children }) => { contacts, loading, error, + realtimeConnected, unreadCount, page, total, diff --git a/src/view/moles/ThreadList.tsx b/src/view/moles/ThreadList.tsx index 82367d2..f69275d 100644 --- a/src/view/moles/ThreadList.tsx +++ b/src/view/moles/ThreadList.tsx @@ -29,6 +29,9 @@ const ThreadList = ({ // Optional delta-sync trigger. When provided, a refresh button is shown in the // pager bar; it fetches only what changed since the last sync (proposal §6). onRefresh = undefined, + // When true, a small "Live" indicator shows that the realtime SSE stream is + // connected and new mail will push in without a manual refresh (Phase 4). + live = undefined, }) => { const navigate = useNavigate(); const parentRef = useRef(null); @@ -89,8 +92,8 @@ const ThreadList = ({ // Pager math. start/end are 1-based and reflect the rows actually shown. const showPager = !!(onNext || onPrev) && total > 0; - // The top bar appears for the pager and/or the refresh button. - const showBar = showPager || !!onRefresh; + // The top bar appears for the pager, the refresh button, and/or the live dot. + const showBar = showPager || !!onRefresh || live !== undefined; const start = page * pageSize + 1; const end = page * pageSize + threads.length; const canPrev = page > 0 && !pageLoading; @@ -119,18 +122,39 @@ const ThreadList = ({ minHeight: 44, }} > - {onRefresh && ( - - {refreshing ? : } - - )} + + {onRefresh && ( + + {refreshing ? : } + + )} + {live !== undefined && ( + + + + {live ? "Live" : "Offline"} + + + )} + {showPager && ( <> {pageLoading && } diff --git a/src/view/pages/InboxPage.tsx b/src/view/pages/InboxPage.tsx index f385769..c2c7cb5 100644 --- a/src/view/pages/InboxPage.tsx +++ b/src/view/pages/InboxPage.tsx @@ -8,7 +8,7 @@ import FloatingActionButton from "../atoms/FloatingActionButton"; import { useData } from "../../nonview/core/DataContext"; function InboxPage() { - const { threads, loading, page, total, pageSize, pageLoading, nextPage, prevPage, refreshFolder } = + const { threads, loading, page, total, pageSize, pageLoading, nextPage, prevPage, refreshFolder, realtimeConnected } = useData(); const navigate = useNavigate(); const [searchQuery, setSearchQuery] = useState(""); @@ -43,6 +43,7 @@ function InboxPage() { onNext={searchQuery ? undefined : () => nextPage("inbox")} onPrev={searchQuery ? undefined : () => prevPage("inbox")} onRefresh={searchQuery ? undefined : () => refreshFolder("inbox")} + live={realtimeConnected} />