f-stack/dpdk/examples/ipsec-secgw/test/trs_ipv6opts.py

183 lines
6.1 KiB
Python
Executable File

#!/usr/bin/env python3
# SPDX-License-Identifier: BSD-3-Clause
from scapy.all import *
import unittest
import pkttest
SRC_ADDR = "1111:0000:0000:0000:0000:0000:0000:0001"
DST_ADDR = "2222:0000:0000:0000:0000:0000:0000:0001"
SRC_NET = "1111:0000:0000:0000:0000:0000:0000:0000/64"
DST_NET = "2222:0000:0000:0000:0000:0000:0000:0000/64"
def config():
return """
sp ipv6 out esp protect 5 pri 1 \\
src {0} \\
dst {1} \\
sport 0:65535 dport 0:65535
sp ipv6 in esp protect 6 pri 1 \\
src {1} \\
dst {0} \\
sport 0:65535 dport 0:65535
sa out 5 cipher_algo null auth_algo null mode transport
sa in 6 cipher_algo null auth_algo null mode transport
rt ipv6 dst {0} port 1
rt ipv6 dst {1} port 0
""".format(SRC_NET, DST_NET)
class TestTransportWithIPv6Ext(unittest.TestCase):
# There is a bug in the IPsec Scapy implementation
# which causes invalid packet reconstruction after
# successful decryption. This method is a workaround.
@staticmethod
def decrypt(pkt, sa):
esp = pkt[ESP]
# decrypt dummy packet with no extensions
d = sa.decrypt(IPv6()/esp)
# fix 'next header' in the preceding header of the original
# packet and remove ESP
pkt[ESP].underlayer.nh = d[IPv6].nh
pkt[ESP].underlayer.remove_payload()
# combine L3 header with decrypted payload
npkt = pkt/d[IPv6].payload
# fix length
npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
return npkt
def setUp(self):
self.px = pkttest.PacketXfer()
self.outb_sa = SecurityAssociation(ESP, spi=5)
self.inb_sa = SecurityAssociation(ESP, spi=6)
def test_outb_ipv6_noopt(self):
pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
# send and check response
resp = self.px.xfer_unprotected(pkt)
self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
self.assertEqual(resp[ESP].spi, 5)
# decrypt response, check packet after decryption
d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
self.assertEqual(d[UDP].sport, 123)
self.assertEqual(d[UDP].dport, 456)
self.assertEqual(bytes(d[UDP].payload), b'abc')
def test_outb_ipv6_opt(self):
hoptions = []
hoptions.append(RouterAlert(value=2))
hoptions.append(Jumbo(jumboplen=5000))
hoptions.append(Pad1())
doptions = []
doptions.append(HAO(hoa="1234::4321"))
pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
pkt /= IPv6ExtHdrHopByHop(options=hoptions)
pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
pkt /= IPv6ExtHdrDestOpt(options=doptions)
pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
# send and check response
resp = self.px.xfer_unprotected(pkt)
self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
# check extensions
self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
# check ESP
self.assertEqual(resp[ESP].spi, 5)
# decrypt response, check packet after decryption
d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
# check UDP
self.assertEqual(d[UDP].sport, 123)
self.assertEqual(d[UDP].dport, 456)
self.assertEqual(bytes(d[UDP].payload), b'abc')
def test_inb_ipv6_noopt(self):
# encrypt and send raw UDP packet
pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
e = self.inb_sa.encrypt(pkt)
# send and check response
resp = self.px.xfer_protected(e)
self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
# check UDP packet
self.assertEqual(resp[UDP].sport, 123)
self.assertEqual(resp[UDP].dport, 456)
self.assertEqual(bytes(resp[UDP].payload), b'abc')
def test_inb_ipv6_opt(self):
hoptions = []
hoptions.append(RouterAlert(value=2))
hoptions.append(Jumbo(jumboplen=5000))
hoptions.append(Pad1())
doptions = []
doptions.append(HAO(hoa="1234::4321"))
# prepare packet with options
pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
pkt /= IPv6ExtHdrHopByHop(options=hoptions)
pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
pkt /= IPv6ExtHdrDestOpt(options=doptions)
pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
e = self.inb_sa.encrypt(pkt)
# self encrypted packet and check response
resp = self.px.xfer_protected(e)
self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
# check UDP
self.assertEqual(resp[UDP].sport, 123)
self.assertEqual(resp[UDP].dport, 456)
self.assertEqual(bytes(resp[UDP].payload), b'abc')
def test_inb_ipv6_frag(self):
# prepare ESP payload
pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
e = self.inb_sa.encrypt(pkt)
# craft and send inbound packet
e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
resp = self.px.xfer_protected(e)
# check response
self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
# check UDP
self.assertEqual(resp[UDP].sport, 123)
self.assertEqual(resp[UDP].dport, 456)
self.assertEqual(bytes(resp[UDP].payload), b'abc')
pkttest.pkttest()