136 lines
3.6 kB
1
defmodule BlueskyHose do
2
use WebSockex
3
require Logger
4
5
@table_name :reddit_links
6
7
def start_link(opts \\ []) do
8
# Create ETS table if it doesn't exist
9
:ets.new(@table_name, [:named_table, :ordered_set, :public, read_concurrency: true])
10
11
WebSockex.start_link(
12
"wss://bsky-relay.c.theo.io/subscribe?wantedCollections=app.bsky.feed.post",
13
__MODULE__,
14
:fake_state,
15
opts
16
)
17
rescue
18
ArgumentError ->
19
# Table already exists
20
WebSockex.start_link(
21
"wss://bsky-relay.c.theo.io/subscribe?wantedCollections=app.bsky.feed.post",
22
__MODULE__,
23
:fake_state,
24
opts
25
)
26
end
27
28
def handle_connect(_conn, _state) do
29
Logger.info("Connected!")
30
IO.puts("#{DateTime.utc_now()}")
31
{:ok, 0}
32
end
33
34
def handle_frame({:text, msg}, state) do
35
msg = Jason.decode!(msg)
36
37
case msg do
38
%{"commit" => %{"record" => %{"text" => skeet}}} = _msg ->
39
# Broadcast to the general skeet feed
40
Phoenix.PubSub.broadcast(
41
Blog.PubSub,
42
"bluesky:skeet",
43
{:new_skeet, skeet}
44
)
45
46
if contains_reddit_link?(skeet) do
47
Logger.info("Reddit link found in skeet: #{skeet}")
48
49
# Create a skeet record with timestamp
50
timestamp = DateTime.utc_now()
51
52
# Extract all Reddit links from skeet text
53
reddit_links = extract_reddit_links(skeet)
54
55
if length(reddit_links) > 0 do
56
skeet_record = %{
57
original_text: skeet,
58
links: reddit_links,
59
time: timestamp,
60
id: generate_id()
61
}
62
63
# Store in ETS with timestamp as key (negative for reverse chronological order)
64
:ets.insert(@table_name, {{-DateTime.to_unix(timestamp)}, skeet_record})
65
66
# Broadcast to subscribers
67
Phoenix.PubSub.broadcast(
68
Blog.PubSub,
69
"reddit_links",
70
{:reddit_link, skeet_record}
71
)
72
end
73
end
74
75
_ ->
76
nil
77
end
78
79
{:ok, state + 1}
80
end
81
82
defp contains_reddit_link?(skeet) when is_binary(skeet) do
83
String.match?(skeet, ~r/reddit\.com|redd\.it/i)
84
end
85
86
defp contains_reddit_link?(_), do: false
87
88
defp extract_reddit_links(text) do
89
# More comprehensive regex to capture full Reddit URLs
90
# This pattern is designed to capture the entire URL including query parameters
91
regex = ~r/(https?:\/\/)?(www\.)?(reddit\.com|redd\.it)\/[^\s"'<>()\[\]{}]+/i
92
93
# Find all matches
94
Regex.scan(regex, text)
95
|> Enum.map(fn [full_match | _] ->
96
# Clean up the URL - remove trailing punctuation that might have been captured
97
clean_url = Regex.replace(~r/[.,;:!?]+$/, full_match, "")
98
99
# Ensure URL has http prefix
100
if String.starts_with?(clean_url, "http") do
101
clean_url
102
else
103
"https://#{clean_url}"
104
end
105
end)
106
|> Enum.uniq()
107
end
108
109
defp generate_id do
110
:crypto.strong_rand_bytes(10) |> Base.encode16(case: :lower)
111
end
112
113
def handle_disconnect(%{reason: {:local, reason}}, state) do
114
Logger.info("Local close with reason: #{inspect(reason)}")
115
{:ok, state}
116
end
117
118
def handle_disconnect(disconnect_map, state) do
119
super(disconnect_map, state)
120
end
121
122
# Function to get all stored Reddit links
123
def get_reddit_links(limit \\ 50) do
124
case :ets.info(@table_name) do
125
:undefined ->
126
[]
127
128
_ ->
129
:ets.tab2list(@table_name)
130
# Already sorted by key, but just to be sure
131
|> Enum.sort()
132
|> Enum.take(limit)
133
|> Enum.map(fn {_key, value} -> value end)
134
end
135
end
136
end
137