jetstream: fix reconnect logic and last time_us saving
Reconnection is now handed by cancelling the connCtx context. This is a lot cleaner and the jetstream client package honors context cancellation.
Saving of last time_us is also simplified now. We only save (update, rather) after the last seen event by incrementing the event's time_us by 1 (this is ok since it's a monotonic clock). We pick up from here upon reconnect and don't save last time_us for any other reason.
Lastly, outside of first boot, we should only ever use UpdateLastTimeUs.
Anirudh Oppiliappan 2 weeks ago 8 files (+76, -44)
MODIFIED
appview/db/jetstream.go
MODIFIED
appview/db/jetstream.go
@@ -1,8 +1,16 @@package dbfunc (d *DB) SaveLastTimeUs(lastTimeUs int64) error {- _, err := d.db.Exec(`update _jetstream set last_time_us = ?`, lastTimeUs)+ _, err := d.db.Exec(`insert into _jetstream (last_time_us) values (?)`, lastTimeUs)return err+}++func (d *DB) UpdateLastTimeUs(lastTimeUs int64) error {+ _, err := d.db.Exec(`update _jetstream set last_time_us = ? where rowid = 1`, lastTimeUs)+ if err != nil {+ return err+ }+ return nil}func (d *DB) GetLastTimeUs() (int64, error) {
MODIFIED
appview/state/middleware.go
MODIFIED
appview/state/middleware.go
@@ -154,7 +154,7 @@ log.Println(didOrHandle)id, err := s.resolver.ResolveIdent(req.Context(), didOrHandle)if err != nil {// invalid did or handle- log.Println("failed to resolve did/handle")+ log.Println("failed to resolve did/handle:", err)w.WriteHeader(http.StatusNotFound)return}
MODIFIED
appview/state/settings.go
MODIFIED
appview/state/settings.go
@@ -68,7 +68,7 @@ })// invalid recordif err != nil {log.Printf("failed to create record: %s", err)- s.pages.Notice(w, "settings-keys-bad", "Failed to create record.")+ s.pages.Notice(w, "settings-keys", "Failed to create record.")return}
MODIFIED
appview/state/state.go
MODIFIED
appview/state/state.go
@@ -8,6 +8,7 @@ "encoding/hex""encoding/json""fmt""log"+ "log/slog""net/http""strings""time"@@ -59,7 +60,7 @@ pgs := pages.NewPages()resolver := appview.NewResolver()- jc, err := jetstream.NewJetstreamClient("appview", []string{tangled.GraphFollowNSID}, nil, db, false)+ jc, err := jetstream.NewJetstreamClient("appview", []string{tangled.GraphFollowNSID}, nil, slog.Default(), db, false)if err != nil {return nil, fmt.Errorf("failed to create jetstream client: %w", err)}@@ -83,7 +84,7 @@ err = db.AddFollow(did, record.Subject, e.Commit.RKey)if err != nil {return fmt.Errorf("failed to add follow to db: %w", err)}- return db.SaveLastTimeUs(e.TimeUS)+ return db.UpdateLastTimeUs(e.TimeUS)}return nil@@ -125,6 +126,7 @@ appPassword := r.FormValue("app_password")resolved, err := s.resolver.ResolveIdent(ctx, handle)if err != nil {+ log.Println("failed to resolve handle:", err)s.pages.Notice(w, "login-msg", fmt.Sprintf("\"%s\" is an invalid handle.", handle))return}
MODIFIED
cmd/knotserver/main.go
MODIFIED
cmd/knotserver/main.go
@@ -45,7 +45,7 @@jc, err := jetstream.NewJetstreamClient("knotserver", []string{tangled.PublicKeyNSID,tangled.KnotMemberNSID,- }, nil, db, true)+ }, nil, l, db, true)if err != nil {l.Error("failed to setup jetstream", "error", err)}
MODIFIED
jetstream/jetstream.go
MODIFIED
jetstream/jetstream.go
@@ -3,6 +3,7 @@import ("context""fmt"+ "log/slog""sync""time"@@ -15,17 +16,21 @@type DB interface {GetLastTimeUs() (int64, error)SaveLastTimeUs(int64) error+ UpdateLastTimeUs(int64) error}type JetstreamClient struct {cfg *client.ClientConfigclient *client.Clientident string+ l *slog.Logger- db DB- reconnectCh chan struct{}- waitForDid bool- mu sync.RWMutex+ db DB+ waitForDid bool+ mu sync.RWMutex++ cancel context.CancelFunc+ cancelMu sync.Mutex}func (j *JetstreamClient) AddDid(did string) {@@ -35,21 +40,25 @@ }j.mu.Lock()j.cfg.WantedDids = append(j.cfg.WantedDids, did)j.mu.Unlock()- j.reconnectCh <- struct{}{}}func (j *JetstreamClient) UpdateDids(dids []string) {j.mu.Lock()for _, did := range dids {if did != "" {+ j.cfg.WantedDids = append(j.cfg.WantedDids, did)}- j.cfg.WantedDids = append(j.cfg.WantedDids, did)}j.mu.Unlock()- j.reconnectCh <- struct{}{}++ j.cancelMu.Lock()+ if j.cancel != nil {+ j.cancel()+ }+ j.cancelMu.Unlock()}-func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB, waitForDid bool) (*JetstreamClient, error) {+func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid bool) (*JetstreamClient, error) {if cfg == nil {cfg = client.DefaultClientConfig()cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"@@ -60,18 +69,18 @@ return &JetstreamClient{cfg: cfg,ident: ident,db: db,+ l: logger,// This will make the goroutine in StartJetstream wait until// cfg.WantedDids has been populated, typically using UpdateDids.- waitForDid: waitForDid,- reconnectCh: make(chan struct{}, 1),+ waitForDid: waitForDid,}, nil}// StartJetstream starts the jetstream client and processes events using the provided processFunc.// The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs).func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {- logger := log.FromContext(ctx)+ logger := j.lsched := sequential.NewScheduler(j.ident, logger, processFunc)@@ -82,38 +91,44 @@ }j.client = clientgo func() {- lastTimeUs := j.getLastTimeUs(ctx)if j.waitForDid {for len(j.cfg.WantedDids) == 0 {time.Sleep(time.Second)}}logger.Info("done waiting for did")- j.connectAndRead(ctx, &lastTimeUs)+ j.connectAndRead(ctx)}()return nil}-func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) {+func (j *JetstreamClient) connectAndRead(ctx context.Context) {l := log.FromContext(ctx)for {+ cursor := j.getLastTimeUs(ctx)++ connCtx, cancel := context.WithCancel(ctx)+ j.cancelMu.Lock()+ j.cancel = cancel+ j.cancelMu.Unlock()++ if err := j.client.ConnectAndRead(connCtx, cursor); err != nil {+ l.Error("error reading jetstream", "error", err)+ }+select {- case <-j.reconnectCh:- l.Info("(re)connecting jetstream client")- j.client.Scheduler.Shutdown()- if err := j.client.ConnectAndRead(ctx, cursor); err != nil {- l.Error("error reading jetstream", "error", err)- }- default:- if err := j.client.ConnectAndRead(ctx, cursor); err != nil {- l.Error("error reading jetstream", "error", err)- }+ case <-ctx.Done():+ l.Info("context done, stopping jetstream")+ return+ case <-connCtx.Done():+ l.Info("connection context done, reconnecting")+ continue}}}-func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 {+func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 {l := log.FromContext(ctx)lastTimeUs, err := j.db.GetLastTimeUs()if err != nil {@@ -121,7 +136,7 @@ l.Warn("couldn't get last time us, starting from now", "error", err)lastTimeUs = time.Now().UnixMicro()err = j.db.SaveLastTimeUs(lastTimeUs)if err != nil {- l.Error("failed to save last time us")+ l.Error("failed to save last time us", "error", err)}}@@ -129,12 +144,12 @@ // If last time is older than a week, start from nowif time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {lastTimeUs = time.Now().UnixMicro()l.Warn("last time us is older than a week. discarding that and starting from now")- err = j.db.SaveLastTimeUs(lastTimeUs)+ err = j.db.UpdateLastTimeUs(lastTimeUs)if err != nil {- l.Error("failed to save last time us")+ l.Error("failed to save last time us", "error", err)}}l.Info("found last time_us", "time_us", lastTimeUs)- return lastTimeUs+ return &lastTimeUs}
MODIFIED
knotserver/db/jetstream.go
MODIFIED
knotserver/db/jetstream.go
@@ -1,8 +1,16 @@package dbfunc (d *DB) SaveLastTimeUs(lastTimeUs int64) error {- _, err := d.db.Exec(`update _jetstream set last_time_us = ?`, lastTimeUs)+ _, err := d.db.Exec(`insert into _jetstream (last_time_us) values (?)`, lastTimeUs)return err+}++func (d *DB) UpdateLastTimeUs(lastTimeUs int64) error {+ _, err := d.db.Exec(`update _jetstream set last_time_us = ? where rowid = 1`, lastTimeUs)+ if err != nil {+ return err+ }+ return nil}func (d *DB) GetLastTimeUs() (int64, error) {
MODIFIED
knotserver/jetstream.go
MODIFIED
knotserver/jetstream.go
@@ -29,7 +29,7 @@ l.Info("added public key from firehose", "did", did)return nil}-func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {+func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember, eventTime int64) error {l := log.FromContext(ctx)if record.Domain != h.c.Server.Hostname {@@ -43,7 +43,6 @@ l.Error("failed to add member", "did", did)return fmt.Errorf("failed to enforce permissions: %w", err)}- l.Info("adding member")if err := h.e.AddMember(ThisServer, record.Member); err != nil {l.Error("failed to add member", "error", err)return fmt.Errorf("failed to add member: %w", err)@@ -59,6 +58,11 @@ if err := h.fetchAndAddKeys(ctx, did); err != nil {return fmt.Errorf("failed to fetch and add keys: %w", err)}+ lastTimeUs := eventTime + 1+ fmt.Println("lastTimeUs", lastTimeUs)+ if err := h.db.UpdateLastTimeUs(lastTimeUs); err != nil {+ return fmt.Errorf("failed to save last time us: %w", err)+ }h.jc.UpdateDids([]string{did})return nil}@@ -129,14 +133,9 @@ var record tangled.KnotMemberif err := json.Unmarshal(raw, &record); err != nil {return fmt.Errorf("failed to unmarshal record: %w", err)}- if err := h.processKnotMember(ctx, did, record); err != nil {+ if err := h.processKnotMember(ctx, did, record, event.TimeUS); err != nil {return fmt.Errorf("failed to process knot member: %w", err)}- }-- lastTimeUs := event.TimeUS- if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {- return fmt.Errorf("failed to save last time us: %w", err)}return nil