136 lines
3.3 kB
1
package jetstream
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
"time"
8
9
"github.com/bluesky-social/jetstream/pkg/client"
10
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
11
"github.com/bluesky-social/jetstream/pkg/models"
12
"github.com/sotangled/tangled/log"
13
)
14
15
type DB interface {
16
GetLastTimeUs() (int64, error)
17
SaveLastTimeUs(int64) error
18
}
19
20
type JetstreamClient struct {
21
cfg *client.ClientConfig
22
client *client.Client
23
ident string
24
25
db DB
26
reconnectCh chan struct{}
27
mu sync.RWMutex
28
}
29
30
func (j *JetstreamClient) AddDid(did string) {
31
j.mu.Lock()
32
j.cfg.WantedDids = append(j.cfg.WantedDids, did)
33
j.mu.Unlock()
34
j.reconnectCh <- struct{}{}
35
}
36
37
func (j *JetstreamClient) UpdateDids(dids []string) {
38
j.mu.Lock()
39
j.cfg.WantedDids = dids
40
j.mu.Unlock()
41
j.reconnectCh <- struct{}{}
42
}
43
44
func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB) (*JetstreamClient, error) {
45
if cfg == nil {
46
cfg = client.DefaultClientConfig()
47
cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
48
cfg.WantedCollections = collections
49
}
50
51
return &JetstreamClient{
52
cfg: cfg,
53
ident: ident,
54
db: db,
55
reconnectCh: make(chan struct{}, 1),
56
}, nil
57
}
58
59
func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
60
logger := log.FromContext(ctx)
61
62
pf := func(ctx context.Context, e *models.Event) error {
63
err := processFunc(ctx, e)
64
if err != nil {
65
return err
66
}
67
68
if err := j.db.SaveLastTimeUs(e.TimeUS); err != nil {
69
return err
70
}
71
72
return nil
73
}
74
75
sched := sequential.NewScheduler(j.ident, logger, pf)
76
77
client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
78
if err != nil {
79
return fmt.Errorf("failed to create jetstream client: %w", err)
80
}
81
j.client = client
82
83
go func() {
84
lastTimeUs := j.getLastTimeUs(ctx)
85
for len(j.cfg.WantedDids) == 0 {
86
time.Sleep(time.Second)
87
}
88
j.connectAndRead(ctx, &lastTimeUs)
89
}()
90
91
return nil
92
}
93
94
func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) {
95
l := log.FromContext(ctx)
96
for {
97
select {
98
case <-j.reconnectCh:
99
l.Info("(re)connecting jetstream client")
100
j.client.Scheduler.Shutdown()
101
if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
102
l.Error("error reading jetstream", "error", err)
103
}
104
default:
105
if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
106
l.Error("error reading jetstream", "error", err)
107
}
108
}
109
}
110
}
111
112
func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 {
113
l := log.FromContext(ctx)
114
lastTimeUs, err := j.db.GetLastTimeUs()
115
if err != nil {
116
l.Warn("couldn't get last time us, starting from now", "error", err)
117
lastTimeUs = time.Now().UnixMicro()
118
err = j.db.SaveLastTimeUs(lastTimeUs)
119
if err != nil {
120
l.Error("failed to save last time us")
121
}
122
}
123
124
// If last time is older than a week, start from now
125
if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
126
lastTimeUs = time.Now().UnixMicro()
127
l.Warn("last time us is older than a week. discarding that and starting from now")
128
err = j.db.SaveLastTimeUs(lastTimeUs)
129
if err != nil {
130
l.Error("failed to save last time us")
131
}
132
}
133
134
l.Info("found last time_us", "time_us", lastTimeUs)
135
return lastTimeUs
136
}
137