50 lines
1.1 kB
1
package state
2
3
import (
4
"context"
5
"encoding/json"
6
"fmt"
7
"log"
8
9
"github.com/bluesky-social/jetstream/pkg/models"
10
tangled "github.com/sotangled/tangled/api/tangled"
11
"github.com/sotangled/tangled/appview/db"
12
)
13
14
type Ingester func(ctx context.Context, e *models.Event) error
15
16
func jetstreamIngester(db *db.DB) Ingester {
17
return func(ctx context.Context, e *models.Event) error {
18
var err error
19
defer func() {
20
eventTime := e.TimeUS
21
lastTimeUs := eventTime + 1
22
if err := db.UpdateLastTimeUs(lastTimeUs); err != nil {
23
err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
24
}
25
}()
26
27
if e.Kind != models.EventKindCommit {
28
return nil
29
}
30
31
did := e.Did
32
raw := json.RawMessage(e.Commit.Record)
33
34
switch e.Commit.Collection {
35
case tangled.GraphFollowNSID:
36
record := tangled.GraphFollow{}
37
err := json.Unmarshal(raw, &record)
38
if err != nil {
39
log.Println("invalid record")
40
return err
41
}
42
err = db.AddFollow(did, record.Subject, e.Commit.RKey)
43
if err != nil {
44
return fmt.Errorf("failed to add follow to db: %w", err)
45
}
46
}
47
48
return err
49
}
50
}
51