214 lines
5.7 KiB
Python
214 lines
5.7 KiB
Python
import ssl
|
|
import socket
|
|
import struct
|
|
import Mumble_pb2
|
|
import threading
|
|
import time
|
|
import opuslib
|
|
import alsaaudio
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESOCB3
|
|
SERVER_IP = "172.16.11.170"
|
|
SERVER_PORT = 64738
|
|
|
|
|
|
# KEY = None
|
|
# SERVER_NONCE = None
|
|
# create raw TCP socket
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
# wrap with TLS
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
|
|
tls = ctx.wrap_socket(sock, server_hostname=SERVER_IP)
|
|
tls.connect((SERVER_IP, SERVER_PORT))
|
|
|
|
print("TLS connected to Mumble server!")
|
|
print(f"Cipher: {tls.cipher()}")
|
|
print(f"TLS version: {tls.version()}")
|
|
|
|
def send_message(tls, msg_type, message):
|
|
|
|
data = message.SerializeToString()
|
|
header = struct.pack(">HI", msg_type, len(data))
|
|
tls.send(header + data)
|
|
print(f"Sent message type: {msg_type} | size: {len(data)}B")
|
|
|
|
def recv_message(tls):
|
|
header = tls.recv(6)
|
|
if len(header) < 6:
|
|
return None, None
|
|
msg_type, length = struct.unpack(">HI", header)
|
|
data = tls.recv(length)
|
|
return msg_type, data
|
|
|
|
def send_ping(tls):
|
|
while True:
|
|
ping = Mumble_pb2.Ping()
|
|
ping.timestamp = int(time.time())
|
|
send_message(tls, 3, ping) # type 3 = Ping
|
|
print("Ping sent!")
|
|
time.sleep(5)
|
|
|
|
|
|
# step 1 — send Version
|
|
version = Mumble_pb2.Version()
|
|
version.version_v1 = (1 << 16) | (4 << 8) | 0 # 1.4.0
|
|
version.release = "1.4.0"
|
|
version.os = "Linux"
|
|
version.os_version = "Ubuntu"
|
|
send_message(tls, 0, version) # type 0 = Version
|
|
print("Sent Version!")
|
|
|
|
|
|
# step 2 — send Authenticate
|
|
auth = Mumble_pb2.Authenticate()
|
|
auth.username = "PythonClient"
|
|
auth.password = "" # ← put server password here if needed
|
|
auth.opus = True
|
|
send_message(tls, 2, auth) # type 2 = Authenticate
|
|
print("Sent Authenticate!")
|
|
|
|
print("\nListening for server messages...")
|
|
MSG_TYPES = {
|
|
0: "Version",
|
|
1: "UDPTunnel",
|
|
2: "Authenticate",
|
|
3: "Ping",
|
|
4: "Reject",
|
|
5: "ServerSync",
|
|
6: "ChannelRemove",
|
|
7: "ChannelState",
|
|
8: "UserRemove",
|
|
9: "UserState",
|
|
15: "CryptSetup",
|
|
24: "TextMessage",
|
|
}
|
|
# start ping thread before main loop
|
|
ping_thread = threading.Thread(target=send_ping, args=(tls,))
|
|
ping_thread.daemon = True # dies when main program exits
|
|
ping_thread.start()
|
|
|
|
|
|
decoder = opuslib.Decoder(48000, 1)
|
|
|
|
CHUNKSIZE = 960
|
|
out = alsaaudio.PCM(
|
|
alsaaudio.PCM_PLAYBACK,
|
|
alsaaudio.PCM_NORMAL,
|
|
channels=1,
|
|
rate=48000,
|
|
format=alsaaudio.PCM_FORMAT_S16_LE,
|
|
periodsize=CHUNKSIZE,
|
|
device='default'
|
|
)
|
|
SILENCE = b'\x00' * (CHUNKSIZE * 2)
|
|
|
|
|
|
def decrypt_udp(data):
|
|
if KEY is None or SERVER_NONCE is None:
|
|
print("No keys yet!")
|
|
return None
|
|
if len(data) < 4:
|
|
return None
|
|
nonce = bytes(SERVER_NONCE[0:15]) + bytes([data[0]])
|
|
encrypted = data[4:]
|
|
try:
|
|
plaintext = AESOCB3(KEY).decrypt(nonce, encrypted, None)
|
|
return plaintext
|
|
except:
|
|
return None
|
|
|
|
def parse_opus(plaintext):
|
|
if not plaintext:
|
|
return None
|
|
header = plaintext[0]
|
|
pkt_type = (header >> 5) & 0x7
|
|
if pkt_type != 4:
|
|
return None
|
|
|
|
offset = 1
|
|
# skip sequence varint
|
|
while plaintext[offset] & 0x80:
|
|
offset += 1
|
|
offset += 1
|
|
|
|
# parse opus length varint
|
|
opus_len = 0
|
|
shift = 0
|
|
while True:
|
|
byte = plaintext[offset]
|
|
opus_len |= (byte & 0x7F) << shift
|
|
offset += 1
|
|
shift += 7
|
|
if not (byte & 0x80):
|
|
break
|
|
|
|
return plaintext[offset:offset + opus_len]
|
|
|
|
# UDP listener thread
|
|
def udp_listener():
|
|
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
udp_sock.bind(("172.16.11.170", 64738)) # bind to any available port
|
|
udp_sock.settimeout(0.05)
|
|
|
|
print("UDP listener started...")
|
|
while True:
|
|
try:
|
|
data, addr = udp_sock.recvfrom(1024)
|
|
if addr[0] != SERVER_IP:
|
|
continue
|
|
|
|
plaintext = decrypt_udp(data)
|
|
opus_data = parse_opus(plaintext)
|
|
|
|
if opus_data:
|
|
pcm = decoder.decode(opus_data, CHUNKSIZE)
|
|
out.write(pcm)
|
|
print(f"Playing audio | Opus: {len(opus_data)}B → PCM: {len(pcm)}B")
|
|
except socket.timeout:
|
|
print(KEY,SERVER_NONCE,"SWS")
|
|
out.write(SILENCE)
|
|
|
|
udp_thread = threading.Thread(target=udp_listener)
|
|
udp_thread.daemon = True
|
|
udp_thread.start()
|
|
|
|
while True:
|
|
msg_type, data = recv_message(tls)
|
|
if msg_type is None:
|
|
print("edjn",msg_type)
|
|
break
|
|
name = MSG_TYPES.get(msg_type, f"Unknown({msg_type})")
|
|
print(f"Received: {name} | size: {len(data)}B",msg_type)
|
|
|
|
# parse specific messages
|
|
if msg_type == 0: # Version
|
|
v = Mumble_pb2.Version()
|
|
v.ParseFromString(data)
|
|
print(f" Server version: {v.release}")
|
|
|
|
elif msg_type == 4: # Reject
|
|
r = Mumble_pb2.Reject()
|
|
r.ParseFromString(data)
|
|
print(f" REJECTED: {r.reason}")
|
|
break
|
|
elif msg_type == 5: # ServerSync — fully connected!
|
|
s = Mumble_pb2.ServerSync()
|
|
s.ParseFromString(data)
|
|
print(f" Session ID: {s.session}")
|
|
print(f" Welcome: {s.welcome_text}")
|
|
print(" FULLY CONNECTED TO MUMBLE! ✅")
|
|
|
|
elif msg_type == 15: # CryptSetup — encryption keys!
|
|
global KEY, SERVER_NONCE
|
|
KEY = None
|
|
SERVER_NONCE = None
|
|
c = Mumble_pb2.CryptSetup()
|
|
c.ParseFromString(data)
|
|
KEY = bytes(c.key)
|
|
SERVER_NONCE = bytearray(c.server_nonce)
|
|
print(f" KEY: {c.key.hex()}")
|
|
print(f" CLIENT_NONCE: {c.client_nonce.hex()}")
|
|
print(f" SERVER_NONCE: {c.server_nonce.hex()}") |