157 lines
3.8 kB
1
package jetstream
2
3
import (
4
"context"
5
"fmt"
6
"log/slog"
7
"sync"
8
"time"
9
10
"github.com/bluesky-social/jetstream/pkg/client"
11
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
12
"github.com/bluesky-social/jetstream/pkg/models"
13
"github.com/sotangled/tangled/log"
14
)
15
16
type DB interface {
17
GetLastTimeUs() (int64, error)
18
SaveLastTimeUs(int64) error
19
UpdateLastTimeUs(int64) error
20
}
21
22
type JetstreamClient struct {
23
cfg *client.ClientConfig
24
client *client.Client
25
ident string
26
l *slog.Logger
27
28
db DB
29
waitForDid bool
30
mu sync.RWMutex
31
32
cancel context.CancelFunc
33
cancelMu sync.Mutex
34
}
35
36
func (j *JetstreamClient) AddDid(did string) {
37
if did == "" {
38
return
39
}
40
j.mu.Lock()
41
j.cfg.WantedDids = append(j.cfg.WantedDids, did)
42
j.mu.Unlock()
43
}
44
45
func (j *JetstreamClient) UpdateDids(dids []string) {
46
j.mu.Lock()
47
for _, did := range dids {
48
if did != "" {
49
j.cfg.WantedDids = append(j.cfg.WantedDids, did)
50
}
51
}
52
j.mu.Unlock()
53
54
j.cancelMu.Lock()
55
if j.cancel != nil {
56
j.cancel()
57
}
58
j.cancelMu.Unlock()
59
}
60
61
func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid bool) (*JetstreamClient, error) {
62
if cfg == nil {
63
cfg = client.DefaultClientConfig()
64
cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
65
cfg.WantedCollections = collections
66
}
67
68
return &JetstreamClient{
69
cfg: cfg,
70
ident: ident,
71
db: db,
72
l: logger,
73
74
// This will make the goroutine in StartJetstream wait until
75
// cfg.WantedDids has been populated, typically using UpdateDids.
76
waitForDid: waitForDid,
77
}, nil
78
}
79
80
// StartJetstream starts the jetstream client and processes events using the provided processFunc.
81
// The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs).
82
func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
83
logger := j.l
84
85
sched := sequential.NewScheduler(j.ident, logger, processFunc)
86
87
client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
88
if err != nil {
89
return fmt.Errorf("failed to create jetstream client: %w", err)
90
}
91
j.client = client
92
93
go func() {
94
if j.waitForDid {
95
for len(j.cfg.WantedDids) == 0 {
96
time.Sleep(time.Second)
97
}
98
}
99
logger.Info("done waiting for did")
100
j.connectAndRead(ctx)
101
}()
102
103
return nil
104
}
105
106
func (j *JetstreamClient) connectAndRead(ctx context.Context) {
107
l := log.FromContext(ctx)
108
for {
109
cursor := j.getLastTimeUs(ctx)
110
111
connCtx, cancel := context.WithCancel(ctx)
112
j.cancelMu.Lock()
113
j.cancel = cancel
114
j.cancelMu.Unlock()
115
116
if err := j.client.ConnectAndRead(connCtx, cursor); err != nil {
117
l.Error("error reading jetstream", "error", err)
118
cancel()
119
continue
120
}
121
122
select {
123
case <-ctx.Done():
124
l.Info("context done, stopping jetstream")
125
return
126
case <-connCtx.Done():
127
l.Info("connection context done, reconnecting")
128
continue
129
}
130
}
131
}
132
133
func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 {
134
l := log.FromContext(ctx)
135
lastTimeUs, err := j.db.GetLastTimeUs()
136
if err != nil {
137
l.Warn("couldn't get last time us, starting from now", "error", err)
138
lastTimeUs = time.Now().UnixMicro()
139
err = j.db.SaveLastTimeUs(lastTimeUs)
140
if err != nil {
141
l.Error("failed to save last time us", "error", err)
142
}
143
}
144
145
// If last time is older than a week, start from now
146
if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 {
147
lastTimeUs = time.Now().UnixMicro()
148
l.Warn("last time us is older than 2 days; discarding that and starting from now")
149
err = j.db.UpdateLastTimeUs(lastTimeUs)
150
if err != nil {
151
l.Error("failed to save last time us", "error", err)
152
}
153
}
154
155
l.Info("found last time_us", "time_us", lastTimeUs)
156
return &lastTimeUs
157
}
158