#!/usr/bin/env python3 # SPDX-License-Identifier: BSD-3-Clause from scapy.all import * import unittest import pkttest SRC_ADDR = "1111:0000:0000:0000:0000:0000:0000:0001" DST_ADDR = "2222:0000:0000:0000:0000:0000:0000:0001" SRC_NET = "1111:0000:0000:0000:0000:0000:0000:0000/64" DST_NET = "2222:0000:0000:0000:0000:0000:0000:0000/64" def config(): return """ sp ipv6 out esp protect 5 pri 1 \\ src {0} \\ dst {1} \\ sport 0:65535 dport 0:65535 sp ipv6 in esp protect 6 pri 1 \\ src {1} \\ dst {0} \\ sport 0:65535 dport 0:65535 sa out 5 cipher_algo null auth_algo null mode transport sa in 6 cipher_algo null auth_algo null mode transport rt ipv6 dst {0} port 1 rt ipv6 dst {1} port 0 """.format(SRC_NET, DST_NET) class TestTransportWithIPv6Ext(unittest.TestCase): # There is a bug in the IPsec Scapy implementation # which causes invalid packet reconstruction after # successful decryption. This method is a workaround. @staticmethod def decrypt(pkt, sa): esp = pkt[ESP] # decrypt dummy packet with no extensions d = sa.decrypt(IPv6()/esp) # fix 'next header' in the preceding header of the original # packet and remove ESP pkt[ESP].underlayer.nh = d[IPv6].nh pkt[ESP].underlayer.remove_payload() # combine L3 header with decrypted payload npkt = pkt/d[IPv6].payload # fix length npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload) return npkt def setUp(self): self.px = pkttest.PacketXfer() self.outb_sa = SecurityAssociation(ESP, spi=5) self.inb_sa = SecurityAssociation(ESP, spi=6) def test_outb_ipv6_noopt(self): pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR) pkt /= UDP(sport=123,dport=456)/Raw(load="abc") # send and check response resp = self.px.xfer_unprotected(pkt) self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) self.assertEqual(resp[ESP].spi, 5) # decrypt response, check packet after decryption d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa) self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP) self.assertEqual(d[UDP].sport, 123) self.assertEqual(d[UDP].dport, 456) self.assertEqual(bytes(d[UDP].payload), b'abc') def test_outb_ipv6_opt(self): hoptions = [] hoptions.append(RouterAlert(value=2)) hoptions.append(Jumbo(jumboplen=5000)) hoptions.append(Pad1()) doptions = [] doptions.append(HAO(hoa="1234::4321")) pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR) pkt /= IPv6ExtHdrHopByHop(options=hoptions) pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"]) pkt /= IPv6ExtHdrDestOpt(options=doptions) pkt /= UDP(sport=123,dport=456)/Raw(load="abc") # send and check response resp = self.px.xfer_unprotected(pkt) self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS) # check extensions self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP) # check ESP self.assertEqual(resp[ESP].spi, 5) # decrypt response, check packet after decryption d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa) self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS) self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP) # check UDP self.assertEqual(d[UDP].sport, 123) self.assertEqual(d[UDP].dport, 456) self.assertEqual(bytes(d[UDP].payload), b'abc') def test_inb_ipv6_noopt(self): # encrypt and send raw UDP packet pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR) pkt /= UDP(sport=123,dport=456)/Raw(load="abc") e = self.inb_sa.encrypt(pkt) # send and check response resp = self.px.xfer_protected(e) self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) # check UDP packet self.assertEqual(resp[UDP].sport, 123) self.assertEqual(resp[UDP].dport, 456) self.assertEqual(bytes(resp[UDP].payload), b'abc') def test_inb_ipv6_opt(self): hoptions = [] hoptions.append(RouterAlert(value=2)) hoptions.append(Jumbo(jumboplen=5000)) hoptions.append(Pad1()) doptions = [] doptions.append(HAO(hoa="1234::4321")) # prepare packet with options pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR) pkt /= IPv6ExtHdrHopByHop(options=hoptions) pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"]) pkt /= IPv6ExtHdrDestOpt(options=doptions) pkt /= UDP(sport=123,dport=456)/Raw(load="abc") e = self.inb_sa.encrypt(pkt) # self encrypted packet and check response resp = self.px.xfer_protected(e) self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS) self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP) # check UDP self.assertEqual(resp[UDP].sport, 123) self.assertEqual(resp[UDP].dport, 456) self.assertEqual(bytes(resp[UDP].payload), b'abc') def test_inb_ipv6_frag(self): # prepare ESP payload pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc") e = self.inb_sa.encrypt(pkt) # craft and send inbound packet e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload resp = self.px.xfer_protected(e) # check response self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT) self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP) # check UDP self.assertEqual(resp[UDP].sport, 123) self.assertEqual(resp[UDP].dport, 456) self.assertEqual(bytes(resp[UDP].payload), b'abc') pkttest.pkttest()