import ssl import socket import struct import Mumble_pb2 import threading import time import opuslib import alsaaudio from cryptography.hazmat.primitives.ciphers.aead import AESOCB3 SERVER_IP = "172.16.11.170" SERVER_PORT = 64738 # KEY = None # SERVER_NONCE = None # create raw TCP socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # wrap with TLS ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE tls = ctx.wrap_socket(sock, server_hostname=SERVER_IP) tls.connect((SERVER_IP, SERVER_PORT)) print("TLS connected to Mumble server!") print(f"Cipher: {tls.cipher()}") print(f"TLS version: {tls.version()}") def send_message(tls, msg_type, message): data = message.SerializeToString() header = struct.pack(">HI", msg_type, len(data)) tls.send(header + data) print(f"Sent message type: {msg_type} | size: {len(data)}B") def recv_message(tls): header = tls.recv(6) if len(header) < 6: return None, None msg_type, length = struct.unpack(">HI", header) data = tls.recv(length) return msg_type, data def send_ping(tls): while True: ping = Mumble_pb2.Ping() ping.timestamp = int(time.time()) send_message(tls, 3, ping) # type 3 = Ping print("Ping sent!") time.sleep(5) # step 1 — send Version version = Mumble_pb2.Version() version.version_v1 = (1 << 16) | (4 << 8) | 0 # 1.4.0 version.release = "1.4.0" version.os = "Linux" version.os_version = "Ubuntu" send_message(tls, 0, version) # type 0 = Version print("Sent Version!") # step 2 — send Authenticate auth = Mumble_pb2.Authenticate() auth.username = "PythonClient" auth.password = "" # ← put server password here if needed auth.opus = True send_message(tls, 2, auth) # type 2 = Authenticate print("Sent Authenticate!") print("\nListening for server messages...") MSG_TYPES = { 0: "Version", 1: "UDPTunnel", 2: "Authenticate", 3: "Ping", 4: "Reject", 5: "ServerSync", 6: "ChannelRemove", 7: "ChannelState", 8: "UserRemove", 9: "UserState", 15: "CryptSetup", 24: "TextMessage", } # start ping thread before main loop ping_thread = threading.Thread(target=send_ping, args=(tls,)) ping_thread.daemon = True # dies when main program exits ping_thread.start() decoder = opuslib.Decoder(48000, 1) CHUNKSIZE = 960 out = alsaaudio.PCM( alsaaudio.PCM_PLAYBACK, alsaaudio.PCM_NORMAL, channels=1, rate=48000, format=alsaaudio.PCM_FORMAT_S16_LE, periodsize=CHUNKSIZE, device='default' ) SILENCE = b'\x00' * (CHUNKSIZE * 2) def decrypt_udp(data): if KEY is None or SERVER_NONCE is None: print("No keys yet!") return None if len(data) < 4: return None nonce = bytes(SERVER_NONCE[0:15]) + bytes([data[0]]) encrypted = data[4:] try: plaintext = AESOCB3(KEY).decrypt(nonce, encrypted, None) return plaintext except: return None def parse_opus(plaintext): if not plaintext: return None header = plaintext[0] pkt_type = (header >> 5) & 0x7 if pkt_type != 4: return None offset = 1 # skip sequence varint while plaintext[offset] & 0x80: offset += 1 offset += 1 # parse opus length varint opus_len = 0 shift = 0 while True: byte = plaintext[offset] opus_len |= (byte & 0x7F) << shift offset += 1 shift += 7 if not (byte & 0x80): break return plaintext[offset:offset + opus_len] # UDP listener thread def udp_listener(): udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp_sock.bind(("172.16.11.170", 64738)) # bind to any available port udp_sock.settimeout(0.05) print("UDP listener started...") while True: try: data, addr = udp_sock.recvfrom(1024) if addr[0] != SERVER_IP: continue plaintext = decrypt_udp(data) opus_data = parse_opus(plaintext) if opus_data: pcm = decoder.decode(opus_data, CHUNKSIZE) out.write(pcm) print(f"Playing audio | Opus: {len(opus_data)}B → PCM: {len(pcm)}B") except socket.timeout: print(KEY,SERVER_NONCE,"SWS") out.write(SILENCE) udp_thread = threading.Thread(target=udp_listener) udp_thread.daemon = True udp_thread.start() while True: msg_type, data = recv_message(tls) if msg_type is None: print("edjn",msg_type) break name = MSG_TYPES.get(msg_type, f"Unknown({msg_type})") print(f"Received: {name} | size: {len(data)}B",msg_type) # parse specific messages if msg_type == 0: # Version v = Mumble_pb2.Version() v.ParseFromString(data) print(f" Server version: {v.release}") elif msg_type == 4: # Reject r = Mumble_pb2.Reject() r.ParseFromString(data) print(f" REJECTED: {r.reason}") break elif msg_type == 5: # ServerSync — fully connected! s = Mumble_pb2.ServerSync() s.ParseFromString(data) print(f" Session ID: {s.session}") print(f" Welcome: {s.welcome_text}") print(" FULLY CONNECTED TO MUMBLE! ✅") elif msg_type == 15: # CryptSetup — encryption keys! global KEY, SERVER_NONCE KEY = None SERVER_NONCE = None c = Mumble_pb2.CryptSetup() c.ParseFromString(data) KEY = bytes(c.key) SERVER_NONCE = bytearray(c.server_nonce) print(f" KEY: {c.key.hex()}") print(f" CLIENT_NONCE: {c.client_nonce.hex()}") print(f" SERVER_NONCE: {c.server_nonce.hex()}")