Source code for pcap

# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""
PCAP Processing Helper Functions
"""
import os
import time
import typing

import dpkt


[docs]def get_reader( file: typing.BinaryIO, ) -> typing.Union[dpkt.pcap.Reader, dpkt.pcapng.Reader]: """Create a reader for a PCAP(NG) file""" return dpkt.pcap.UniversalReader(file)
def _writepkt_patch(self, pkt, ts): """ Monkey-patch to convert timestamps to floats before writing. Only needed for pcapng format, its Writer multiplies the timestamp by 1e6, which throws a TypeError if passed a Decimal. """ self._original_writepkt(pkt, float(ts))
[docs]def get_writer( file: typing.BinaryIO, packet_size: int = 9000, ) -> typing.Union[dpkt.pcap.Writer, dpkt.pcapng.Writer]: """ Create a writer for a PCAP(NG) file :param file: file object to write to :param packet_size: packet size (Bytes) """ if os.path.splitext(file.name)[1] == ".pcapng": if not hasattr(dpkt.pcapng.Writer, "_original_writepkt"): # monkey-patch the writer to do a type conversion on the timestamp dpkt.pcapng.Writer._original_writepkt = dpkt.pcapng.Writer.writepkt dpkt.pcapng.Writer.writepkt = _writepkt_patch writer = dpkt.pcapng.Writer(file, snaplen=packet_size) else: writer = dpkt.pcap.Writer(file, snaplen=packet_size, nano=True) return writer
[docs]def packet_size_from_pcap(in_filename: str) -> int: """ Get the packet size from a given PCAP(NG) file. Note: only inspects the first packet! :param str in_filename: path to file :return: packet size (Bytes) """ with open(in_filename, "rb") as in_file: reader = get_reader(in_file) for timestamp, packet in reader: # assess first packet, # assume all packets are same size return len(packet) return 0 # in case it failed to open file
[docs]def count_packets_in_pcap(in_filename: str) -> int: """ Count the total number of packets from a given PCAP(NG) file. """ with open(in_filename, "rb") as in_file: reader = get_reader(in_file) for n, (timestamp, packet) in enumerate(reader): if n % 1000 == 0: # brief sleep to give the control system a chance to do things time.sleep(0.0001) return n + 1 return 0 # in case it failed to open file
[docs]def eth_from_sll(sll_bytes: bytes) -> bytes: """ Convert an SLL ("cooked") PCAP packet into an Ethernet one """ sll = dpkt.sll.SLL(sll_bytes) eth = dpkt.ethernet.Ethernet() eth.data = sll.data.pack() if sll.hlen == 6: # Ethernet MAC address is 6 bytes eth.src = sll.hdr[: sll.hlen] return eth.pack()