144 lines
3.6 kB
1
package jetstream
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
"time"
8
9
"github.com/bluesky-social/jetstream/pkg/client"
10
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
11
"github.com/bluesky-social/jetstream/pkg/models"
12
"github.com/sotangled/tangled/log"
13
)
14
15
type DB interface {
16
GetLastTimeUs() (int64, error)
17
SaveLastTimeUs(int64) error
18
}
19
20
type JetstreamClient struct {
21
cfg *client.ClientConfig
22
client *client.Client
23
ident string
24
25
db DB
26
reconnectCh chan struct{}
27
waitForDid bool
28
mu sync.RWMutex
29
}
30
31
func (j *JetstreamClient) AddDid(did string) {
32
j.mu.Lock()
33
j.cfg.WantedDids = append(j.cfg.WantedDids, did)
34
j.mu.Unlock()
35
j.reconnectCh <- struct{}{}
36
}
37
38
func (j *JetstreamClient) UpdateDids(dids []string) {
39
j.mu.Lock()
40
j.cfg.WantedDids = dids
41
j.mu.Unlock()
42
j.reconnectCh <- struct{}{}
43
}
44
45
func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB, waitForDid bool) (*JetstreamClient, error) {
46
if cfg == nil {
47
cfg = client.DefaultClientConfig()
48
cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
49
cfg.WantedCollections = collections
50
}
51
52
return &JetstreamClient{
53
cfg: cfg,
54
ident: ident,
55
db: db,
56
57
// This will make the goroutine in StartJetstream wait until
58
// cfg.WantedDids has been populated, typically using UpdateDids.
59
waitForDid: waitForDid,
60
reconnectCh: make(chan struct{}, 1),
61
}, nil
62
}
63
64
func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
65
logger := log.FromContext(ctx)
66
67
pf := func(ctx context.Context, e *models.Event) error {
68
err := processFunc(ctx, e)
69
if err != nil {
70
return err
71
}
72
73
if err := j.db.SaveLastTimeUs(e.TimeUS); err != nil {
74
return err
75
}
76
77
return nil
78
}
79
80
sched := sequential.NewScheduler(j.ident, logger, pf)
81
82
client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
83
if err != nil {
84
return fmt.Errorf("failed to create jetstream client: %w", err)
85
}
86
j.client = client
87
88
go func() {
89
lastTimeUs := j.getLastTimeUs(ctx)
90
if j.waitForDid {
91
for len(j.cfg.WantedDids) == 0 {
92
time.Sleep(time.Second)
93
}
94
}
95
logger.Info("done waiting for did")
96
j.connectAndRead(ctx, &lastTimeUs)
97
}()
98
99
return nil
100
}
101
102
func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) {
103
l := log.FromContext(ctx)
104
for {
105
select {
106
case <-j.reconnectCh:
107
l.Info("(re)connecting jetstream client")
108
j.client.Scheduler.Shutdown()
109
if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
110
l.Error("error reading jetstream", "error", err)
111
}
112
default:
113
if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
114
l.Error("error reading jetstream", "error", err)
115
}
116
}
117
}
118
}
119
120
func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 {
121
l := log.FromContext(ctx)
122
lastTimeUs, err := j.db.GetLastTimeUs()
123
if err != nil {
124
l.Warn("couldn't get last time us, starting from now", "error", err)
125
lastTimeUs = time.Now().UnixMicro()
126
err = j.db.SaveLastTimeUs(lastTimeUs)
127
if err != nil {
128
l.Error("failed to save last time us")
129
}
130
}
131
132
// If last time is older than a week, start from now
133
if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
134
lastTimeUs = time.Now().UnixMicro()
135
l.Warn("last time us is older than a week. discarding that and starting from now")
136
err = j.db.SaveLastTimeUs(lastTimeUs)
137
if err != nil {
138
l.Error("failed to save last time us")
139
}
140
}
141
142
l.Info("found last time_us", "time_us", lastTimeUs)
143
return lastTimeUs
144
}
145