147 lines
4.1 kB
1
package knotserver
2
3
import (
4
"context"
5
"encoding/json"
6
"fmt"
7
"io"
8
"net/http"
9
"net/url"
10
"strings"
11
12
"github.com/bluesky-social/jetstream/pkg/models"
13
"github.com/sotangled/tangled/api/tangled"
14
"github.com/sotangled/tangled/knotserver/db"
15
"github.com/sotangled/tangled/log"
16
)
17
18
func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
19
l := log.FromContext(ctx)
20
pk := db.PublicKey{
21
Did: did,
22
PublicKey: record,
23
}
24
if err := h.db.AddPublicKey(pk); err != nil {
25
l.Error("failed to add public key", "error", err)
26
return fmt.Errorf("failed to add public key: %w", err)
27
}
28
l.Info("added public key from firehose", "did", did)
29
return nil
30
}
31
32
func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
33
l := log.FromContext(ctx)
34
35
if record.Domain != h.c.Server.Hostname {
36
l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
37
return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
38
}
39
40
ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
41
if err != nil || !ok {
42
l.Error("failed to add member", "did", did)
43
return fmt.Errorf("failed to enforce permissions: %w", err)
44
}
45
46
if err := h.e.AddMember(ThisServer, record.Member); err != nil {
47
l.Error("failed to add member", "error", err)
48
return fmt.Errorf("failed to add member: %w", err)
49
}
50
l.Info("added member from firehose", "member", record.Member)
51
52
if err := h.db.AddDid(did); err != nil {
53
l.Error("failed to add did", "error", err)
54
return fmt.Errorf("failed to add did: %w", err)
55
}
56
57
if err := h.fetchAndAddKeys(ctx, did); err != nil {
58
return fmt.Errorf("failed to fetch and add keys: %w", err)
59
}
60
61
return nil
62
}
63
64
func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
65
l := log.FromContext(ctx)
66
67
keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
68
if err != nil {
69
l.Error("error building endpoint url", "did", did, "error", err.Error())
70
return fmt.Errorf("error building endpoint url: %w", err)
71
}
72
73
resp, err := http.Get(keysEndpoint)
74
if err != nil {
75
l.Error("error getting keys", "did", did, "error", err)
76
return fmt.Errorf("error getting keys: %w", err)
77
}
78
defer resp.Body.Close()
79
80
if resp.StatusCode == http.StatusNotFound {
81
l.Info("no keys found for did", "did", did)
82
return nil
83
}
84
85
plaintext, err := io.ReadAll(resp.Body)
86
if err != nil {
87
l.Error("error reading response body", "error", err)
88
return fmt.Errorf("error reading response body: %w", err)
89
}
90
91
for _, key := range strings.Split(string(plaintext), "\n") {
92
if key == "" {
93
continue
94
}
95
pk := db.PublicKey{
96
Did: did,
97
}
98
pk.Key = key
99
if err := h.db.AddPublicKey(pk); err != nil {
100
l.Error("failed to add public key", "error", err)
101
return fmt.Errorf("failed to add public key: %w", err)
102
}
103
}
104
return nil
105
}
106
107
func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
108
did := event.Did
109
if event.Kind != models.EventKindCommit {
110
return nil
111
}
112
113
var err error
114
defer func() {
115
eventTime := event.TimeUS
116
lastTimeUs := eventTime + 1
117
fmt.Println("lastTimeUs", lastTimeUs)
118
if err := h.db.UpdateLastTimeUs(lastTimeUs); err != nil {
119
err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
120
}
121
h.jc.UpdateDids([]string{did})
122
}()
123
124
raw := json.RawMessage(event.Commit.Record)
125
126
switch event.Commit.Collection {
127
case tangled.PublicKeyNSID:
128
var record tangled.PublicKey
129
if err := json.Unmarshal(raw, &record); err != nil {
130
return fmt.Errorf("failed to unmarshal record: %w", err)
131
}
132
if err := h.processPublicKey(ctx, did, record); err != nil {
133
return fmt.Errorf("failed to process public key: %w", err)
134
}
135
136
case tangled.KnotMemberNSID:
137
var record tangled.KnotMember
138
if err := json.Unmarshal(raw, &record); err != nil {
139
return fmt.Errorf("failed to unmarshal record: %w", err)
140
}
141
if err := h.processKnotMember(ctx, did, record); err != nil {
142
return fmt.Errorf("failed to process knot member: %w", err)
143
}
144
}
145
146
return err
147
}
148