168 lines
5.9 kB
1
import asyncio
2
import json
3
import websockets
4
import time
5
import threading
6
from asyncio import Queue
7
8
class PhoenixClient:
9
def __init__(self, uri="wss://thoughts-and-tidbits.fly.dev/socket/websocket"):
10
self.uri = uri
11
self.ref = 0
12
self.running = False
13
self.receive_lock = asyncio.Lock()
14
self.message_queue = Queue()
15
self.response_queues = {}
16
17
def _get_ref(self):
18
self.ref += 1
19
ref = str(self.ref)
20
self.response_queues[ref] = Queue()
21
return ref
22
23
def _create_message(self, event_type, topic, payload=None):
24
return json.dumps({
25
"topic": topic,
26
"event": event_type,
27
"payload": payload or {},
28
"ref": self._get_ref()
29
})
30
31
async def connect(self):
32
print(f"Connecting to {self.uri}...")
33
try:
34
self.websocket = await websockets.connect(
35
self.uri,
36
ssl=True,
37
ping_interval=30,
38
ping_timeout=10
39
)
40
41
# Start the message processor
42
self.running = True
43
asyncio.create_task(self._process_messages())
44
45
# Join the skeet channel
46
join_message = self._create_message("phx_join", "skeet:lobby")
47
await self.websocket.send(join_message)
48
response = await self.response_queues[join_message["ref"]].get()
49
print(f"Join response: {response}")
50
return True
51
except Exception as e:
52
print(f"Connection error: {e}")
53
return False
54
55
async def _process_messages(self):
56
try:
57
while self.running:
58
try:
59
message = await self.websocket.recv()
60
data = json.loads(message)
61
62
if "ref" in data:
63
# This is a response to a sent message
64
queue = self.response_queues.get(data["ref"])
65
if queue:
66
await queue.put(data)
67
if data["event"] != "phx_reply": # Keep queue for ongoing subscriptions
68
del self.response_queues[data["ref"]]
69
70
# Handle broadcasts
71
if data.get("event") == "new_message" and "ref" not in data:
72
payload = data.get("payload", {})
73
print(f"\n📨 New message from {payload.get('user')}:")
74
print(f" {payload.get('body')}")
75
if payload.get('reply_to'):
76
print(f" ↳ Reply to: {payload.get('reply_to')}")
77
print("\nEnter message (or 'quit' to exit): ", end='', flush=True)
78
except websockets.exceptions.ConnectionClosed:
79
break
80
except Exception as e:
81
print(f"Error processing message: {e}")
82
finally:
83
self.running = False
84
85
async def send_message(self, body, user, reply_to=None):
86
message = self._create_message(
87
"new_message",
88
"skeet:lobby",
89
{
90
"body": body,
91
"user": user,
92
"reply_to": reply_to
93
}
94
)
95
ref = json.loads(message)["ref"]
96
await self.websocket.send(message)
97
response = await self.response_queues[ref].get()
98
return response
99
100
async def close(self):
101
self.running = False
102
await self.websocket.close()
103
104
async def main():
105
client = PhoenixClient()
106
if await client.connect():
107
try:
108
# Get user's name first
109
username = input("Enter your username (max 16 chars): ").strip()
110
while not username or len(username) > 16:
111
username = input("Please enter a valid username (1-16 chars): ").strip()
112
113
print("\nConnected! You can now send messages.")
114
print("To reply to a message, start your message with 'r:message_id:'")
115
print("Example: r:abc123: This is a reply")
116
print("Enter 'quit' to exit")
117
118
# Start message listener in background
119
listener = asyncio.create_task(client.listen_for_messages())
120
121
while True:
122
try:
123
message = input("\nEnter message (or 'quit' to exit): ").strip()
124
if message.lower() == 'quit':
125
break
126
127
reply_to = None
128
if message.startswith('r:'):
129
try:
130
_, reply_id, message = message.split(':', 2)
131
reply_to = reply_id.strip()
132
message = message.strip()
133
except ValueError:
134
print("Invalid reply format. Use: r:message_id: your message")
135
continue
136
137
if not message:
138
continue
139
140
if len(message) > 250:
141
print("Message too long (max 250 chars)")
142
continue
143
144
response = await client.send_message(
145
body=message,
146
user=username,
147
reply_to=reply_to
148
)
149
150
print(f"✓ Message sent (ID: {response['payload']['id']})")
151
152
except Exception as e:
153
print(f"Error sending message: {e}")
154
155
except Exception as e:
156
print(f"Error in main loop: {e}")
157
finally:
158
await client.close()
159
if 'listener' in locals():
160
await listener
161
else:
162
print("Failed to connect")
163
164
if __name__ == "__main__":
165
try:
166
asyncio.run(main())
167
except KeyboardInterrupt:
168
print("\nGoodbye!")
169