140 lines
3.7 kB
1
package jetstream
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
"time"
8
9
"github.com/bluesky-social/jetstream/pkg/client"
10
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
11
"github.com/bluesky-social/jetstream/pkg/models"
12
"github.com/sotangled/tangled/log"
13
)
14
15
type DB interface {
16
GetLastTimeUs() (int64, error)
17
SaveLastTimeUs(int64) error
18
}
19
20
type JetstreamClient struct {
21
cfg *client.ClientConfig
22
client *client.Client
23
ident string
24
25
db DB
26
reconnectCh chan struct{}
27
waitForDid bool
28
mu sync.RWMutex
29
}
30
31
func (j *JetstreamClient) AddDid(did string) {
32
if did == "" {
33
return
34
}
35
j.mu.Lock()
36
j.cfg.WantedDids = append(j.cfg.WantedDids, did)
37
j.mu.Unlock()
38
j.reconnectCh <- struct{}{}
39
}
40
41
func (j *JetstreamClient) UpdateDids(dids []string) {
42
j.mu.Lock()
43
for _, did := range dids {
44
if did != "" {
45
}
46
j.cfg.WantedDids = append(j.cfg.WantedDids, did)
47
}
48
j.mu.Unlock()
49
j.reconnectCh <- struct{}{}
50
}
51
52
func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB, waitForDid bool) (*JetstreamClient, error) {
53
if cfg == nil {
54
cfg = client.DefaultClientConfig()
55
cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
56
cfg.WantedCollections = collections
57
}
58
59
return &JetstreamClient{
60
cfg: cfg,
61
ident: ident,
62
db: db,
63
64
// This will make the goroutine in StartJetstream wait until
65
// cfg.WantedDids has been populated, typically using UpdateDids.
66
waitForDid: waitForDid,
67
reconnectCh: make(chan struct{}, 1),
68
}, nil
69
}
70
71
// StartJetstream starts the jetstream client and processes events using the provided processFunc.
72
// The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs).
73
func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
74
logger := log.FromContext(ctx)
75
76
sched := sequential.NewScheduler(j.ident, logger, processFunc)
77
78
client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
79
if err != nil {
80
return fmt.Errorf("failed to create jetstream client: %w", err)
81
}
82
j.client = client
83
84
go func() {
85
lastTimeUs := j.getLastTimeUs(ctx)
86
if j.waitForDid {
87
for len(j.cfg.WantedDids) == 0 {
88
time.Sleep(time.Second)
89
}
90
}
91
logger.Info("done waiting for did")
92
j.connectAndRead(ctx, &lastTimeUs)
93
}()
94
95
return nil
96
}
97
98
func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) {
99
l := log.FromContext(ctx)
100
for {
101
select {
102
case <-j.reconnectCh:
103
l.Info("(re)connecting jetstream client")
104
j.client.Scheduler.Shutdown()
105
if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
106
l.Error("error reading jetstream", "error", err)
107
}
108
default:
109
if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
110
l.Error("error reading jetstream", "error", err)
111
}
112
}
113
}
114
}
115
116
func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 {
117
l := log.FromContext(ctx)
118
lastTimeUs, err := j.db.GetLastTimeUs()
119
if err != nil {
120
l.Warn("couldn't get last time us, starting from now", "error", err)
121
lastTimeUs = time.Now().UnixMicro()
122
err = j.db.SaveLastTimeUs(lastTimeUs)
123
if err != nil {
124
l.Error("failed to save last time us")
125
}
126
}
127
128
// If last time is older than a week, start from now
129
if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
130
lastTimeUs = time.Now().UnixMicro()
131
l.Warn("last time us is older than a week. discarding that and starting from now")
132
err = j.db.SaveLastTimeUs(lastTimeUs)
133
if err != nil {
134
l.Error("failed to save last time us")
135
}
136
}
137
138
l.Info("found last time_us", "time_us", lastTimeUs)
139
return lastTimeUs
140
}
141