Benutzer:Grey/UDP-Testskript: Unterschied zwischen den Versionen
(→netdate-Beispiel) |
(→netdate-Beispiel) |
||
Zeile 22: | Zeile 22: | ||
− | [[ | + | ---- |
+ | |||
+ | Nun wollen wir uns das Testskript im detail anschauen und von Anfang an aufbauen, am Ende ist der Link zum kompletten Source-Code vom netdate-Skript. | ||
+ | |||
+ | Als erstes müssen wir unsere wrapper-Klassen(mit dem Präfix ''fwt_'' benannt) und andere benötigte Klassen importieren: | ||
+ | |||
+ | import fwt_ip //wrapper-Klasse zum erstellen/ändern von IP-Paketen | ||
+ | import fwt_udp //wrapper-Klasse zum erstellen/ändern von UDP-Paketen | ||
+ | import fwt_tcp //wrapper-Klasse zum erstellen/ändern von TCP-Paketen | ||
+ | import posix //'''TODO''': Erklärung | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | ACHEN='10.000.2.2' | ||
+ | BCHEN='10.128.2.2' | ||
+ | |||
+ | |||
+ | FILTER_A="not tcp port 1500 and src " + BCHEN + " and dst " + ACHEN + " or icmp" | ||
+ | FILTER_B="not tcp port 1500 and src " + ACHEN + " and dst " + BCHEN + " or icmp" | ||
+ | |||
+ | |||
+ | ip_packet = fwt_ip.Packet(fwt_ip.defaultPacket) | ||
+ | |||
+ | |||
+ | |||
+ | ip_packet.set ({ | ||
+ | fwt_ip.HEADER_PROTOCOL: 17, | ||
+ | }) | ||
+ | |||
+ | |||
+ | ip_packet.fixup (fwt_ip.FIXUP_ALL) | ||
+ | |||
+ | |||
+ | udp_pkt_ab = fwt_udp.Packet(fwt_udp.defaultPacket) | ||
+ | udp_pkt_ba = fwt_udp.Packet(fwt_udp.defaultPacket) | ||
+ | |||
+ | |||
+ | udp_pkt_ab.set({fwt_udp.HEADER_CHECKSUM: 0, | ||
+ | fwt_udp.HEADER_SOURCEPORT: 53, | ||
+ | fwt_udp.HEADER_DESTPORT: 37, | ||
+ | fwt_udp.PAYLOAD: '\x0a'}) | ||
+ | |||
+ | |||
+ | udp_pkt_ba.set({ | ||
+ | fwt_udp.HEADER_CHECKSUM: 0, | ||
+ | fwt_udp.HEADER_SOURCEPORT: 37, | ||
+ | fwt_udp.HEADER_DESTPORT: 53, | ||
+ | fwt_udp.PAYLOAD: '\x69\xdc\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03\x77\x77\x77\x05\x68\x65\x69\x73\x65\x02\x64\x65\x00\x00\x01\x00\x01', | ||
+ | }) | ||
+ | |||
+ | |||
+ | udp_pkt_ab.fixup(fwt_udp.FIXUP_ALL, ACHEN, BCHEN); | ||
+ | udp_pkt_ba.fixup(fwt_udp.FIXUP_ALL, BCHEN, ACHEN); | ||
+ | |||
+ | |||
+ | ip_udp_pkt_ab = fwt_ip.Packet(ip_packet.as_dict()) | ||
+ | |||
+ | ip_udp_pkt_ab.set({ | ||
+ | fwt_ip.HEADER_SOURCEIP: ACHEN, | ||
+ | fwt_ip.HEADER_DESTIP: BCHEN, | ||
+ | fwt_ip.PAYLOAD: udp_pkt_ab.as_str() | ||
+ | }) | ||
+ | |||
+ | ip_udp_pkt_ab.fixup(fwt_ip.FIXUP_ALL) | ||
+ | |||
+ | |||
+ | ip_udp_pkt_ba = fwt_ip.Packet(ip_packet.as_dict()) | ||
+ | |||
+ | |||
+ | ip_udp_pkt_ba.set({ | ||
+ | fwt_ip.HEADER_SOURCEIP: BCHEN, | ||
+ | fwt_ip.HEADER_DESTIP: ACHEN, | ||
+ | fwt_ip.PAYLOAD: udp_pkt_ba.as_str()}) | ||
+ | |||
+ | |||
+ | ip_udp_pkt_ba.fixup(fwt_ip.FIXUP_ALL) | ||
+ | |||
+ | |||
+ | def printResult (result): | ||
+ | if result[0] is not None: | ||
+ | print 'Received on A:\n%r\n' % result[0] | ||
+ | if result[1] is not None: | ||
+ | print 'Received on B:\n%r\n' % result[1] | ||
+ | if result[0] is None and result[1] is None: | ||
+ | print 'Timeout' | ||
+ | |||
+ | |||
+ | if dry_run == 1: | ||
+ | print 'dry_run: skip all transceive functions' | ||
+ | print 'Would send IP packet a->b:\n%s\n' % repr(ip_udp_pkt_ab.as_dict()) | ||
+ | print 'Would send IP packet b->a:\n%s\n' % repr(ip_udp_pkt_ba.as_dict()) | ||
+ | else: | ||
+ | fwt_ip.set_coarse_filter_and_arp (fwt_ip.IFACE_A, [ ("10.000.0.2", "10.127.255.255")]) | ||
+ | fwt_ip.set_coarse_filter_and_arp (fwt_ip.IFACE_B, [ ("10.128.0.2", "10.255.255.255")]) | ||
+ | |||
+ | if response_only == 1: | ||
+ | print 'Will send IP packet b->a:\n%r\n' % ip_udp_pkt_ba | ||
+ | result = fwt_ip.transceive (fwt_ip.IFACE_B, ip_udp_pkt_ba, filterA=FILTER_A, filterB=FILTER_B, timeoutA=500, timeoutB=0) | ||
+ | printResult (result) | ||
+ | else: | ||
+ | print 'Will send IP packet a->b:\n%r\n' % ip_udp_pkt_ab | ||
+ | result = fwt_ip.transceive (fwt_ip.IFACE_A, ip_udp_pkt_ab, filterA=FILTER_A, filterB=FILTER_B, timeoutA=0, timeoutB=500) | ||
+ | printResult (result) | ||
+ | |||
+ | if result[1] is not None: | ||
+ | print 'Will send IP packet b->a:\n%r\n' % ip_udp_pkt_ba | ||
+ | result = fwt_ip.transceive (fwt_ip.IFACE_B, ip_udp_pkt_ba, filterA=FILTER_A, filterB=FILTER_B, timeoutA=500, timeoutB=0) | ||
+ | printResult (result) | ||
+ | else: | ||
+ | print 'Lost origin packet -- Skip sending answer' | ||
+ | |||
+ | |||
+ | |||
+ | [http://www.freitagsrunde.org/~grey/sonstiges/std/netdate.py kompletter Code von netdate]. | ||
== transceive-Funktion == | == transceive-Funktion == |
Version vom 28. September 2005, 13:28 Uhr
Inhaltsverzeichnis
UDP-Testskript
Versuchsaufbau
Es gibt verschiedene Varianten des Aufrufs von FWTEST und FWAGENT, näheres hier: Versuchsaufbauten
netdate-Beispiel
An Hand eines Beispiels wollen wir die Implementation eines Testskripts darstellen. Wir wollen zum Beipiel den netdate-Dienst von www.heise.de in Anspruch nehmen (wir nehmen den Dienst von Heise nicht wirklich in Anspruch, sondern simulieren lediglich den Traffic) und uns die aktuelle Uhrzeit zurückgeben lassen. Netdate funktioniert auf verschiedenen Protokollen, wir nehmen mal die Variante mit UDP-Paketen.
Wenn man also mit Ethereal oder tcpdump den Verkehr belauschen würde, sollen die Pakete so aussehen als wären sie echt und nicht von uns erstellt (mehr dazu unter: Grenzen von Tests).
Im Grunde arbeiten wir folgende Punkte nacheinander ab:
- default IP-Paket(Achen -> Bchen) erstellen
- default IP-Paket(Bchen -> Achen) erstellen
- default UDP-Paket(Achen -> Bchen) erstellen
- default UDP-Paket(Bchen -> Achen) erstellen
- IP-Pakete anpassen
- UDP-Pakete anpassen
- IP- und UDP-Pakete zusammenfügen
- UDP/IP-Pakete mittels transceive-Funktion versenden
- Auswertung
Nun wollen wir uns das Testskript im detail anschauen und von Anfang an aufbauen, am Ende ist der Link zum kompletten Source-Code vom netdate-Skript.
Als erstes müssen wir unsere wrapper-Klassen(mit dem Präfix fwt_ benannt) und andere benötigte Klassen importieren:
import fwt_ip //wrapper-Klasse zum erstellen/ändern von IP-Paketen import fwt_udp //wrapper-Klasse zum erstellen/ändern von UDP-Paketen import fwt_tcp //wrapper-Klasse zum erstellen/ändern von TCP-Paketen import posix //TODO: Erklärung
ACHEN='10.000.2.2' BCHEN='10.128.2.2'
FILTER_A="not tcp port 1500 and src " + BCHEN + " and dst " + ACHEN + " or icmp" FILTER_B="not tcp port 1500 and src " + ACHEN + " and dst " + BCHEN + " or icmp"
ip_packet = fwt_ip.Packet(fwt_ip.defaultPacket)
ip_packet.set ({ fwt_ip.HEADER_PROTOCOL: 17, })
ip_packet.fixup (fwt_ip.FIXUP_ALL)
udp_pkt_ab = fwt_udp.Packet(fwt_udp.defaultPacket) udp_pkt_ba = fwt_udp.Packet(fwt_udp.defaultPacket)
udp_pkt_ab.set({fwt_udp.HEADER_CHECKSUM: 0, fwt_udp.HEADER_SOURCEPORT: 53, fwt_udp.HEADER_DESTPORT: 37, fwt_udp.PAYLOAD: '\x0a'})
udp_pkt_ba.set({ fwt_udp.HEADER_CHECKSUM: 0, fwt_udp.HEADER_SOURCEPORT: 37, fwt_udp.HEADER_DESTPORT: 53, fwt_udp.PAYLOAD: '\x69\xdc\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03\x77\x77\x77\x05\x68\x65\x69\x73\x65\x02\x64\x65\x00\x00\x01\x00\x01', })
udp_pkt_ab.fixup(fwt_udp.FIXUP_ALL, ACHEN, BCHEN); udp_pkt_ba.fixup(fwt_udp.FIXUP_ALL, BCHEN, ACHEN);
ip_udp_pkt_ab = fwt_ip.Packet(ip_packet.as_dict())
ip_udp_pkt_ab.set({ fwt_ip.HEADER_SOURCEIP: ACHEN, fwt_ip.HEADER_DESTIP: BCHEN, fwt_ip.PAYLOAD: udp_pkt_ab.as_str() }) ip_udp_pkt_ab.fixup(fwt_ip.FIXUP_ALL)
ip_udp_pkt_ba = fwt_ip.Packet(ip_packet.as_dict())
ip_udp_pkt_ba.set({ fwt_ip.HEADER_SOURCEIP: BCHEN, fwt_ip.HEADER_DESTIP: ACHEN, fwt_ip.PAYLOAD: udp_pkt_ba.as_str()})
ip_udp_pkt_ba.fixup(fwt_ip.FIXUP_ALL)
def printResult (result): if result[0] is not None: print 'Received on A:\n%r\n' % result[0] if result[1] is not None: print 'Received on B:\n%r\n' % result[1] if result[0] is None and result[1] is None: print 'Timeout'
if dry_run == 1: print 'dry_run: skip all transceive functions' print 'Would send IP packet a->b:\n%s\n' % repr(ip_udp_pkt_ab.as_dict()) print 'Would send IP packet b->a:\n%s\n' % repr(ip_udp_pkt_ba.as_dict()) else: fwt_ip.set_coarse_filter_and_arp (fwt_ip.IFACE_A, [ ("10.000.0.2", "10.127.255.255")]) fwt_ip.set_coarse_filter_and_arp (fwt_ip.IFACE_B, [ ("10.128.0.2", "10.255.255.255")])
if response_only == 1: print 'Will send IP packet b->a:\n%r\n' % ip_udp_pkt_ba result = fwt_ip.transceive (fwt_ip.IFACE_B, ip_udp_pkt_ba, filterA=FILTER_A, filterB=FILTER_B, timeoutA=500, timeoutB=0) printResult (result) else: print 'Will send IP packet a->b:\n%r\n' % ip_udp_pkt_ab result = fwt_ip.transceive (fwt_ip.IFACE_A, ip_udp_pkt_ab, filterA=FILTER_A, filterB=FILTER_B, timeoutA=0, timeoutB=500) printResult (result)
if result[1] is not None: print 'Will send IP packet b->a:\n%r\n' % ip_udp_pkt_ba result = fwt_ip.transceive (fwt_ip.IFACE_B, ip_udp_pkt_ba, filterA=FILTER_A, filterB=FILTER_B, timeoutA=500, timeoutB=0) printResult (result) else: print 'Lost origin packet -- Skip sending answer'