udp_experimental_recorder/sniffer.py
George 80e36c9df8 feat:
--Added Auth for mumble server
2026-05-05 16:29:56 +05:30

119 lines
3.3 KiB
Python

import ssl
import socket
import struct
import Mumble_pb2
import threading
import time
SERVER_IP = "172.16.11.170"
SERVER_PORT = 64738
# create raw TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# wrap with TLS
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
tls = ctx.wrap_socket(sock, server_hostname=SERVER_IP)
tls.connect((SERVER_IP, SERVER_PORT))
print("TLS connected to Mumble server!")
print(f"Cipher: {tls.cipher()}")
print(f"TLS version: {tls.version()}")
def send_message(tls, msg_type, message):
data = message.SerializeToString()
header = struct.pack(">HI", msg_type, len(data))
tls.send(header + data)
print(f"Sent message type: {msg_type} | size: {len(data)}B")
def recv_message(tls):
header = tls.recv(6)
if len(header) < 6:
return None, None
msg_type, length = struct.unpack(">HI", header)
data = tls.recv(length)
return msg_type, data
def send_ping(tls):
while True:
ping = Mumble_pb2.Ping()
ping.timestamp = int(time.time())
send_message(tls, 3, ping) # type 3 = Ping
print("Ping sent!")
time.sleep(5)
# step 1 — send Version
version = Mumble_pb2.Version()
version.version_v1 = (1 << 16) | (4 << 8) | 0 # 1.4.0
version.release = "1.4.0"
version.os = "Linux"
version.os_version = "Ubuntu"
send_message(tls, 0, version) # type 0 = Version
print("Sent Version!")
# step 2 — send Authenticate
auth = Mumble_pb2.Authenticate()
auth.username = "PythonClient"
auth.password = "" # ← put server password here if needed
auth.opus = True
send_message(tls, 2, auth) # type 2 = Authenticate
print("Sent Authenticate!")
print("\nListening for server messages...")
MSG_TYPES = {
0: "Version",
1: "UDPTunnel",
2: "Authenticate",
3: "Ping",
4: "Reject",
5: "ServerSync",
6: "ChannelRemove",
7: "ChannelState",
8: "UserRemove",
9: "UserState",
15: "CryptSetup",
24: "TextMessage",
}
# start ping thread before main loop
ping_thread = threading.Thread(target=send_ping, args=(tls,))
ping_thread.daemon = True # dies when main program exits
ping_thread.start()
while True:
msg_type, data = recv_message(tls)
if msg_type is None:
print("edjn",msg_type)
break
name = MSG_TYPES.get(msg_type, f"Unknown({msg_type})")
print(f"Received: {name} | size: {len(data)}B",msg_type)
# parse specific messages
if msg_type == 0: # Version
v = Mumble_pb2.Version()
v.ParseFromString(data)
print(f" Server version: {v.release}")
elif msg_type == 4: # Reject
r = Mumble_pb2.Reject()
r.ParseFromString(data)
print(f" REJECTED: {r.reason}")
break
elif msg_type == 5: # ServerSync — fully connected!
s = Mumble_pb2.ServerSync()
s.ParseFromString(data)
print(f" Session ID: {s.session}")
print(f" Welcome: {s.welcome_text}")
print(" FULLY CONNECTED TO MUMBLE! ✅")
elif msg_type == 15: # CryptSetup — encryption keys!
c = Mumble_pb2.CryptSetup()
c.ParseFromString(data)
print(f" KEY: {c.key.hex()}")
print(f" CLIENT_NONCE: {c.client_nonce.hex()}")
print(f" SERVER_NONCE: {c.server_nonce.hex()}")