61 lines
1.7 kB
1
defmodule BlueskyHose do
2
use WebSockex
3
require Logger
4
alias Blog.Social.Skeet
5
alias Blog.Repo
6
7
def start_link(opts \\ []) do
8
WebSockex.start_link("wss://bsky-relay.c.theo.io/subscribe?wantedCollections=app.bsky.feed.post", __MODULE__, :fake_state, opts)
9
end
10
11
def handle_connect(_conn, _state) do
12
Logger.info("Connected!")
13
IO.puts("#{DateTime.utc_now}")
14
{:ok, 0}
15
end
16
17
def handle_frame({:text, msg}, state) do
18
msg = Jason.decode!(msg)
19
case msg do
20
%{"commit" => %{"record" => %{"text" => skeet}}} = msg ->
21
# IO.puts(skeet)
22
23
if rem(state, 1200) == 0 do
24
# save every 3600th message
25
Logger.info("Saving skeet #{state}")
26
%Skeet{}
27
|> Skeet.changeset(%{skeet: skeet})
28
# |> Repo.insert()
29
end
30
case String.contains?(String.downcase(skeet), "muenster") do
31
true ->
32
IO.puts("Got cheese skeet\n\n\n\n#{skeet}")
33
34
# Persist the skeet, it doesnt matter if its a duplicate cuz we have a unique constraint
35
%Skeet{}
36
|> Skeet.changeset(%{skeet: skeet})
37
|> Repo.insert()
38
39
# Broadcast to PubSub
40
Phoenix.PubSub.broadcast(
41
Blog.PubSub,
42
"muenster_posts",
43
{:new_post, skeet}
44
)
45
false -> :do_nothing
46
end
47
_ ->
48
nil
49
end
50
{:ok, state + 1}
51
end
52
53
def handle_disconnect(%{reason: {:local, reason}}, state) do
54
Logger.info("Local close with reason: #{inspect reason}")
55
{:ok, state}
56
end
57
58
def handle_disconnect(disconnect_map, state) do
59
super(disconnect_map, state)
60
end
61
end
62