Source code for fnss.traffic.trafficmatrices

"""Functions and classes for creating and manipulating traffic matrices.

The functions of this class allow users to synthetically generate traffic
matrices with given statistical properties according to models proposed in
literature.

The output of this generation is either a TrafficMatrix or a
TrafficMatrixSequence object.

A traffic matrix or a sequence of matrices can be read and written from/to an
XML files with provided functions.
"""
import itertools
import multiprocessing as mp
from math import exp, sin, pi, log, sqrt
from collections import Counter
import xml.etree.cElementTree as ET

from numpy import isinf
from numpy.random import lognormal, normal
import networkx as nx

from fnss.units import capacity_units, time_units
import fnss.util as util
from fnss.topologies.topology import fan_in_out_capacities, \
                                     od_pairs_from_topology


__all__ = [
    'TrafficMatrix',
    'TrafficMatrixSequence',
    'static_traffic_matrix',
    'stationary_traffic_matrix',
    'sin_cyclostationary_traffic_matrix',
    'read_traffic_matrix',
    'write_traffic_matrix',
    'validate_traffic_matrix',
    'link_loads'
           ]


[docs]class TrafficMatrix(object): """ Class representing a single traffic matrix. It simply contains a set of traffic volumes being exchanged between origin-destination pairs Parameters ---------- volume_unit : str The unit in which traffic volumes are expressed flows : dict, optional The traffic volumes or the matrix, keyed by origin-destination pair. The origin-destination pair is a tuple whose two elements are respectively the identifier of the origin and destination nodes and volumes are all expressed in the same unit """ def __init__(self, volume_unit='Mbps', flows=None): """ Initialize the traffic matrix """ if not volume_unit in capacity_units: raise ValueError("The volume_unit argument is not valid") self.attrib = {} self.attrib['volume_unit'] = volume_unit self.flow = flows if flows is not None else {} return def __iter__(self): """ Iterates over the flows. Use the expression 'for flow in traffic_matrix' """ return iter(self.flows()) def __len__(self): """ Return the number of OD pairs of the matrix. Use the expression 'len(traffic_matrix)' Returns ------- len : int The number of OD pairs """ return len(self.flows()) def __contains__(self, item): """ Test whether a specific OD flow is in the matrix or not. Use the expression '(origin, destination) in traffic_matrix' """ origin, destination = item return origin in self.flow and destination in self.flow[origin] def __getitem__(self, key): """ Return the traffic volume of a specific flow of the matrix. Use the expression 'volume = traffic_matrix[(origin, destination)]' """ origin, destination = key return self.flow[origin][destination] def __setitem__(self, key, value): """ Sets the traffic volume of a specific flow of the matrix. Use the expression 'traffic_matrix[(origin, destination)] = volume' """ origin, destination = key if not origin in self.flow: self.flow[origin] = {} self.flow[origin][destination] = value def __delitem__(self, key): """ Delete a specific flow from the matrix. Use the expression 'del traffic_matrix[(u, v)]' """ origin, destination = key del self.flow[origin][destination] if len(self.flow[origin]) == 0: del self.flow[origin] def flows(self): """ Return the flows of the traffic matrix Returns ------- flows : dict A dictionary of all traffic volumes keyed by OD pair """ return {(o, d): self.flow[o][d] for o in self.flow for d in self.flow[o] if o != d} def od_pairs(self): """ Return all OD pairs of the traffic matrix Returns ------- od_pairs : list A list of OD pairs. Each OD pair is expressed as an (origin, destination) tuple """ return [(o, d) for o in self.flow for d in self.flow[o] if o != d] def add_flow(self, origin, destination, volume): """ Add a flow to the traffic matrix Parameters ---------- origin : any hashable type The origin node destination : any hashable type The destination node volume : float The traffic volume """ if origin not in self.flow: self.flow[origin] = {} self.flow[origin][destination] = volume def pop_flow(self, origin, destination): """ Pop a flow from the traffic matrix and return the volume of the flow removed. If the flow to remove does not exist, a KeyError is raised. Parameters ---------- origin : any hashable type The origin node destination : any hashable type The destination node Raises ------ KeyError: if there is no flow from the given origin to the given destination Returns ------- volume : float The volume of the flow popped from the matrix """ if origin not in self.flow or destination not in self.flow[origin]: raise KeyError('There is no flow from %s to %s' % (str(origin), str(destination))) return self.flow[origin].pop(destination)
[docs]class TrafficMatrixSequence(object): """ Class representing a sequence of traffic matrices. Parameters ---------- interval : float or int, optional The time interval elapsed between subsequent traffic matrices of the sequence t_unit : str, optional The unit of the interval value (e.g. 'sec' or 'min') """ def __init__(self, interval=None, t_unit='min'): """ Initialize the traffic matrix sequence Parameters ---------- interval : float, optional The time interval between subsequent traffic matrices of the sequence t_unit : str, optional The time unit of the time interval """ self.attrib = {} if interval is not None: if not t_unit in time_units: raise ValueError("The t_unit argument is not valid") self.attrib['interval'] = interval self.attrib['t_unit'] = t_unit self.matrix = [] return def __iter__(self): """ Iterates over the matrices of the sequence. Use the expression 'for traffic_matrix in traffic_matrix_sequence' """ return iter(self.matrix) def __len__(self): """ Return the number of traffic matrices of the sequence. Use the expression 'len(traffic_matrix_sequence)' """ return len(self.matrix) def __getitem__(self, key): """ Return the traffic matrix at a specific index of the sequence. Use the expression 'traffic_matrix = traffic_matrix_sequence[i]' """ return self.matrix[key] def __setitem__(self, key, value): """ Add/edit the traffic matrix at a specific index of the sequence. Use the expression 'traffic_matrix_sequence[i] = traffic_matrix' """ self.matrix[key] = value def __delitem__(self, key): """ Remove the traffic matrix at a specific index of the sequence. Use the expression 'del traffic_matrix_sequence[i]' """ del self.matrix[key] def insert(self, i, tm): """ Insert a traffic matrix in the sequence at a specified position Parameters ---------- i : int The position at which the matrix is inserted tm : TrafficMatrix The traffic matrix to insert """ self.matrix.insert(i, tm) def append(self, tm): """ Append a traffic matrix at the end of the sequence Parameters ---------- tm : TrafficMatrix The traffic matrix to append """ self.matrix.append(tm) def get(self, i): """ Return a specific traffic matrix in a specific position of the sequence Parameters ---------- i : int The index of the traffic matrix Returns ------- tm : TrafficMatrix """ return self.matrix[i] def pop(self, i): """ Removes the traffic matrix in a specific position of the sequence Parameters ---------- i : int The index of the traffic matrix to remove Raises ------ IndexError : if list is empty or index is out of range. Returns ------- tm : TrafficMatrix The TrafficMatrix popped from the sequence Examples -------- >>> import fnss >>> tms = fnss.TrafficMatrixSequence() >>> tm = TrafficMatrix() >>> tms.append(tm) >>> tms.remove(tm) """ self.matrix.pop(i)
# We assume that links are full duplex, if undirected
[docs]def static_traffic_matrix(topology, mean, stddev, max_u=0.9, origin_nodes=None, destination_nodes=None): """ Return a TrafficMatrix object, i.e. a single traffic matrix, representing the traffic volume exchanged over a network at a specific point in time This matrix is generated by assigning traffic volumes drawn from a lognormal distribution and assigned to specific origin-destination pairs using the Ranking Metrics Heuristic method proposed by Nucci et al. [1]_ Parameters ---------- topology : topology The topology for which the traffic matrix is calculated. This topology can either be directed or undirected. If it is undirected, this function assumes that all links are full-duplex. mean : float The mean volume of traffic among all origin-destination pairs stddev : float The standard deviation of volumes among all origin-destination pairs. max_u : float, optional Represent the max link utilization. If specified, traffic volumes are scaled so that the most utilized link of the network has an utilization equal to max_u. If None, volumes are not scaled, but in this case links may end up with an utilization factor greater than 1.0 origin_nodes : list, optional A list of all nodes which can be traffic sources. If not specified, all nodes of the topology are traffic sources destination_nodes : list, optional A list of all nodes which can be traffic destinations. If not specified, all nodes of the topology are traffic destinations Returns ------- tm : TrafficMatrix References ---------- .. [1] Nucci et al., The problem of synthetically generating IP traffic matrices: initial recommendations, ACM SIGCOMM Computer Communication Review, 35(3), 2005 """ try: mean = float(mean) stddev = float(stddev) except ValueError: raise ValueError('mean and stddev must be of type float') if mean < 0 or stddev < 0: raise ValueError('mean and stddev must be not negative') topology = topology.copy() if topology.is_directed() \ else topology.to_directed() volume_unit = topology.graph['capacity_unit'] mu = log(mean ** 2 / sqrt(stddev ** 2 + mean ** 2)) sigma = sqrt(log((stddev ** 2 / mean ** 2) + 1)) if origin_nodes is None and destination_nodes is None: od_pairs = od_pairs_from_topology(topology) else: all_nodes = topology.nodes() origins = origin_nodes or all_nodes destinations = destination_nodes or all_nodes od_pairs = [(o, d) for o in origins for d in destinations if o != d] nr_pairs = len(od_pairs) volumes = sorted(lognormal(mu, sigma, size=nr_pairs)) # volumes = sorted([lognormvariate(mu, sigma) for _ in range(nr_pairs)]) if any(isinf(vol) for vol in volumes): raise ValueError('Some volumes are too large to be handled by a '\ 'float type. Set a lower value of mu and try again.') sorted_od_pairs = __ranking_metrics_heuristic(topology, od_pairs) # check if the matrix matches and scale if needed assignments = dict(zip(sorted_od_pairs, volumes)) if max_u is not None: if origin_nodes is not None: shortest_path = dict( (node, nx.single_source_dijkstra_path(topology, node, weight='weight')) for node in origin_nodes) # remove OD pairs not connected for o, d in itertools.product(shortest_path, destinations): if o != d and d not in shortest_path[o]: od_pairs.remove((o, d)) else: shortest_path = dict(nx.all_pairs_dijkstra_path(topology, weight='weight')) for u, v in topology.edges(): topology.adj[u][v]['load'] = 0.0 # Find max u for o, d in od_pairs: path = shortest_path[o][d] if len(path) > 1: for u, v in zip(path[:-1], path[1:]): topology.adj[u][v]['load'] += assignments[(o, d)] # Calculate scaling current_max_u = max((float(topology.adj[u][v]['load']) \ / float(topology.adj[u][v]['capacity']) for u, v in topology.edges())) norm_factor = max_u / current_max_u for od_pair in assignments: assignments[od_pair] *= norm_factor # write to traffic matrix traffic_matrix = TrafficMatrix(volume_unit=volume_unit) for (o, d), flow in assignments.items(): traffic_matrix.add_flow(o, d, flow) return traffic_matrix
[docs]def stationary_traffic_matrix(topology, mean, stddev, gamma, log_psi, n, max_u=0.9, origin_nodes=None, destination_nodes=None): """ Return a stationary sequence of traffic matrices. The sequence is generated by first generating a single matrix assigning traffic volumes drawn from a lognormal distribution and assigned to specific origin-destination pairs using the Ranking Metrics Heuristic method proposed by Nucci et al. [2]_. Then, all matrices of the sequence are generated by adding zero-mean normal fluctuation in the traffic volumes. This process was originally proposed by [2]_ Stationary sequences of traffic matrices are generally suitable for modeling network traffic over short periods (up to 1.5 hours). Over longer periods, real traffic exhibits diurnal patterns and they are better modelled by cyclostationary sequences Parameters ---------- topology : topology The topology for which the traffic matrix is calculated. This topology can either be directed or undirected. If it is undirected, this function assumes that all links are full-duplex. mean : float The mean volume of traffic among all origin-destination pairs stddev : float The standard deviation of volumes among all origin-destination pairs. gamma : float Parameter expressing relation between mean and standard deviation of traffic volumes of a specific flow over the time log_psi : float Parameter expressing relation between mean and standard deviation of traffic volumes of a specific flow over the time n : int Number of matrices in the sequence max_u : float, optional Represent the max link utilization. If specified, traffic volumes are scaled so that the most utilized link of the network has an utilization equal to max_u. If None, volumes are not scaled, but in this case links may end up with an utilization factor greater than 1.0 origin_nodes : list, optional A list of all nodes which can be traffic sources. If not specified all nodes of the topology are traffic sources destination_nodes : list, optional A list of all nodes which can be traffic destinations. If not specified all nodes of the topology are traffic destinations Returns ------- tms : TrafficMatrixSequence References ---------- .. [2] Nucci et al., The problem of synthetically generating IP traffic matrices: initial recommendations, ACM SIGCOMM Computer Communication Review, 35(3), 2005 """ tm_sequence = TrafficMatrixSequence() static_tm = static_traffic_matrix(topology, mean, stddev, max_u=None, origin_nodes=origin_nodes, destination_nodes=destination_nodes) volume_unit = static_tm.attrib['volume_unit'] mean_dict = static_tm.flows() psi = exp(log_psi) if psi == 0.0: raise ValueError("The value of log_psi provided is too small and " "causes psi=0.0, which makes the standard deviation " "of random fluctuation to become infinite. Try with " "a greater value of log_psi") std_dict = {(o, d): (m / psi) ** (1.0 / gamma) for (o, d), m in mean_dict.items()} if any(isinf(std) for std in std_dict.values()): raise ValueError("The value of log_psi or gamma provided are too " "small and causes the standard deviation of random " "fluctuations to become infinite. Try with a greater " "value of log_psi and/or gamma") flows = {} for o, d in mean_dict: # Implementation without Numpy: # flows[(o, d)] = [max([0, normalvariate(mean_dict[(o, d)], # std_dict[(o, d)])]) for _ in range(n)] flows[(o, d)] = [max((0, normal(mean_dict[(o, d)], std_dict[(o, d)])))\ for _ in range(n)] for i in range(n): traffic_marix = TrafficMatrix(volume_unit=volume_unit) for o, d in mean_dict: traffic_marix.add_flow(o, d, flows[(o, d)][i]) tm_sequence.append(traffic_marix) if max_u is not None: if origin_nodes is not None: shortest_path = dict( (node, nx.single_source_dijkstra_path(topology, node, weight='weight')) for node in origin_nodes) else: shortest_path = dict(nx.all_pairs_dijkstra_path(topology, weight='weight')) current_max_u = max((max(link_loads(topology, tm_sequence.get(i), shortest_path ).values()) for i in range(n))) norm_factor = max_u / current_max_u for i in range(n): for o, d in mean_dict: tm_sequence.matrix[i].flow[o][d] *= norm_factor return tm_sequence
[docs]def sin_cyclostationary_traffic_matrix(topology, mean, stddev, gamma, log_psi, delta=0.2, n=24, periods=1, max_u=0.9, origin_nodes=None, destination_nodes=None): """ Return a cyclostationary sequence of traffic matrices, where traffic volumes evolve over time as sin waves. The sequence is generated by first generating a single matrix assigning traffic volumes drawn from a lognormal distribution and assigned to specific origin-destination pairs using the Ranking Metrics Heuristic method proposed by Nucci et al. [3]_. Then, all matrices of the sequence are generated by adding zero-mean normal fluctuation in the traffic volumes. Finally, traffic volumes are multiplied by a sin function with unitary mean to model periodic fluctuations. This process was originally proposed by [3]_. Cyclostationary sequences of traffic matrices are generally suitable for modeling real network traffic over long periods, up to several days. In fact, real traffic exhibits diurnal patterns well modelled by cyclostationary sequences. Parameters ---------- topology : topology The topology for which the traffic matrix is calculated. This topology can either be directed or undirected. If it is undirected, this function assumes that all links are full-duplex. mean : float The mean volume of traffic among all origin-destination pairs stddev : float The standard deviation of volumes among all origin-destination pairs. gamma : float Parameter expressing relation between mean and standard deviation of traffic volumes of a specific flow over the time log_psi : float Parameter expressing relation between mean and standard deviation of traffic volumes of a specific flow over the time delta : float [0, 1] A parameter indicating the intensity of variation of traffic volumes over a period. Specifically, let x be the mean volume over a specific OD pair, the minimum and maximum traffic volumes for that OD pair (excluding random fluctuations) are respectively :math:`x*(1 - delta)` and :math:`x*(1 + delta)` n : int Number of traffic matrices per period. For example, if it is desired to model traffic varying cyclically over a 24 hour period, and n is set to 24, therefore, the time interval between subsequent traffic matrices is is 1 hour. periods : int Number of periods. In total the sequence is composed of :math:`n * periods` traffic matrices. max_u : float, optional Represent the max link utilization. If specified, traffic volumes are scaled so that the most utilized link of the network has an utilization equal to max_u. If None, volumes are not scaled, but in this case links may end up with an utilization factor greater than 1.0 origin_nodes : list, optional A list of all nodes which can be traffic sources. If not specified all nodes of the topology are traffic sources destination_nodes : list, optional A list of all nodes which can be traffic destinations. If not specified all nodes of the topology are traffic destinations Returns ------- tms : TrafficMatrixSequence References ---------- .. [3] Nucci et al., The problem of synthetically generating IP traffic matrices: initial recommendations, ACM SIGCOMM Computer Communication Review, 35(3), 2005 """ tm_sequence = TrafficMatrixSequence() static_tm = static_traffic_matrix(topology, mean, stddev, max_u=None, origin_nodes=origin_nodes, destination_nodes=destination_nodes) volume_unit = static_tm.attrib['volume_unit'] mean_dict = static_tm.flows() psi = exp(log_psi) if psi == 0.0: raise ValueError("The value of log_psi provided is too small and " "causes psi=0.0, which makes the standard deviation " "of random fluctuation to become infinite. Try with " "a greater value of log_psi") std_dict = {(o, d): (m / psi) ** (1.0 / gamma) for (o, d), m in mean_dict.items()} print(std_dict.values()) if any(isinf(std) for std in std_dict.values()): raise ValueError("The value of log_psi or gamma provided are too " "small and causes the standard deviation of random " "fluctuations to become infinite. Try with a greater " "value of log_psi and/or gamma") od_pairs = static_tm.od_pairs() for _ in range(periods): for i in range(n): tm = TrafficMatrix(volume_unit=volume_unit) for o, d in od_pairs: volume = static_tm[(o, d)] * (1 + delta * sin((2 * pi * i) / n)) # Implementation without Numpy # volume = max([0, normalvariate(volume, std_dict[(o, d)])]) volume = max((0, normal(volume, std_dict[(o, d)]))) tm.add_flow(o, d, volume) tm_sequence.append(tm) if max_u is not None: if origin_nodes is not None: shortest_path = dict( (node, nx.single_source_dijkstra_path(topology, node, weight='weight')) for node in origin_nodes) else: shortest_path = dict(nx.all_pairs_dijkstra_path(topology, weight='weight')) current_max_u = max((max(link_loads(topology, tm_sequence.get(i), shortest_path ).values()) for i in range(n * periods))) norm_factor = max_u / current_max_u for i in range(n * periods): for o, d in mean_dict: tm_sequence.matrix[i].flow[o][d] *= norm_factor return tm_sequence
def __ranking_metrics_heuristic(topology, od_pairs=None): """ Sort OD pairs of a topology according to the Ranking Metrics Heuristics method Parameters ---------- topology : Topology or DirectedTopology The topology od_pairs : list, optional The OD pairs to be ranked (must be a subset of the OD pairs of the topology). If None, then the heuristic is calculated for all the OD pairs of the topology Returns ------- od_pairs : list The sorted list of OD pairs """ # Ranking Metrics Heuristic if od_pairs is None: od_pairs = od_pairs_from_topology(topology) fan_in, fan_out = fan_in_out_capacities(topology) degree = topology.degree() min_capacity = {(u, v): min(fan_out[u], fan_in[v]) for u, v in od_pairs} min_degree = {(u, v): min(degree[u], degree[v]) for u, v in od_pairs} # NFUR calculation is expensive, so before calculating it, the code # checks if it is really needed, i.e. if there are ties after capacity # and degree sorting cap_deg_pairs = [(min_capacity[(u, v)], min_degree[(u, v)]) for u, v in od_pairs] nfur_required = any((val > 1 for val in Counter(cap_deg_pairs).values())) if not nfur_required: # Sort all OD_pairs return sorted(od_pairs, key=lambda od_pair: (min_capacity[od_pair], min_degree[od_pair])) # if NFUR is required we calculate it. If fast is True, this function # returns the betweenness centrality rather than the NFUR. # Use betweenness centrality instead of NFUR if the topology is not trivial # for scalability reasons. # The threshold of 300 is a conservative value which allows fast execution # on most machines. parallelize = (topology.number_of_edges() > 100) fast = (topology.number_of_edges() > 300) nfur = __calc_nfur(topology, fast, parallelize) # Note: here we use the opposite of max rather than the inverse of max # (which is the formulation of the paper) because we only need to rank # in reverse order the max of NFURs. Since all NFURs are >=0, # using the opposite yields the same results as the inverse, but there # is no risk of incurring in divisions by 0. max_inv_nfur = {(u, v):-max(nfur[u], nfur[v]) for u, v in od_pairs} # Sort all OD_pairs return sorted(od_pairs, key=lambda od_pair: (min_capacity[od_pair], min_degree[od_pair], max_inv_nfur[od_pair])) def __calc_nfur(topology, fast, parallelize=True): """ Calculate the Number of Flows under Failure (NFUR) for all nodes of a topology Parameters ---------- topology : Topology or DirectedTopology The topology fast : bool If True returns betweenness centrality instead of NFUR parallelize : bool If True, spawns as many processes as the number of cores of the machine using the map-reduce algorithm. It is always recommended unless the topology is very small. If *fast* parameter is True, this option is ignored, as betweenness centrality calculation cannot be parallelized. Returns ------- nfur : dict A dictionary of the NFURs of the topology keyed by node. If fast is True, returns betweenness centrality instead. Notes ----- The time complexity of the calculation of NFUR grows linearly with the number of links. A topology with thousands of links is likely to take hours to compute on commodity hardware. For this reason, this function can spawn as many processes as the number of cores of the machine on which it runs and parallelizes the task with a map-reduce algorithm. """ # Note: The NFUR calculation doesn't scale because I need to calc betw for # each edge removed. With many nodes and edges, takes very long. The # parallelization reduced the time but it can still be very long betw = nx.betweenness_centrality(topology, normalized=False, weight='weight') if fast: return betw edges = topology.edges() if not parallelize: # execute the NFUR calculation in one single process # Recommended only if the size of the topology is so small that the # overhead of creating new processes overcomes the performance gains # achieved by splitting the calculation return __nfur_func(topology, edges, betw) try: processes = mp.cpu_count() except NotImplementedError: processes = 32 # upper bound of number of cores on a commodity server pool = mp.Pool(processes) # map operation edges_chunks = util.split_list(edges, len(edges) // processes) args = [(__nfur_func, (topology, chunk, betw)) for chunk in edges_chunks] result = pool.map(util.map_func, args) # reduce operation return {v: max((result[i][v] for i in range(len(result)))) for v in betw} def __nfur_func(topology, edges, betweenness): """ Calculate NFUR on a specific set of edges Parameters ---------- topology : Topology The topology edges : list The list of edges (subset of topology edges) betweenness : dict The betweeness centrality of the topology, keyed by node Returns ------- nfur : dict NFUR values keyed by node, only relative to failures of the specified edges """ nfur = betweenness.copy() topology = topology.copy() for u, v in edges: edge_attr = topology.adj[u][v] topology.remove_edge(u, v) betw = nx.betweenness_centrality(topology, normalized=False, weight='weight') for node in betw.keys(): if betw[node] > nfur[node]: nfur[node] = betw[node] topology.add_edge(u, v, **edge_attr) return nfur # Note: Calling networkx's all_pairs_shortest_path does not return multiple # paths with same cost (and apparently doesn't even select path randomly, # but selects the next hop with lowest ID).
[docs]def validate_traffic_matrix(topology, traffic_matrix, validate_load=False): """ Validate whether a given traffic matrix and given topology are compatible. Returns True if they are compatible, False otherwise This validation includes validating whether the origin-destination pairs of the traffic matrix are coincide with or are a subset of the origin-destination pairs of the topology. Optionally, this function can verify if the volumes of the traffic matrix are compatible too, i.e. if at any time, no link has an utilization greater than 1.0. Parameters ---------- topology : topology The topology agains which the traffic matrix is validated tm : TrafficMatrix or TrafficMatrixSequence The traffic matrix (or sequence of) to be validated validate_load : bool, optional Specify whether load compatibility has to be validated or not. Default value is False Returns ------- is_valid : bool True if the topology and the traffic matrix are compatible, False otherwise """ if isinstance(traffic_matrix, TrafficMatrix): matrices = [traffic_matrix] elif isinstance(traffic_matrix, TrafficMatrixSequence): matrices = traffic_matrix.matrix else: raise ValueError('tm must be either a TrafficMatrix or a '\ ' TrafficMatrixSequence object') topology = topology.copy() if topology.is_directed() \ else topology.to_directed() od_pairs_topology = od_pairs_from_topology(topology) if validate_load: shortest_path = dict(nx.all_pairs_dijkstra_path(topology, weight='weight')) for matrix in matrices: od_pairs_tm = matrix.od_pairs() # verify that OD pairs in TM are equal or subset of topology if not all(((o, d) in od_pairs_topology for o, d in od_pairs_tm)): return False if validate_load: for u, v in topology.edges(): topology.adj[u][v]['load'] = 0 capacity_unit = capacity_units[topology.graph['capacity_unit']] volume_unit = capacity_units[matrix.attrib['volume_unit']] norm_factor = float(volume_unit) / float(capacity_unit) for o, d in od_pairs_tm: path = shortest_path[o][d] if len(path) <= 1: continue for u, v in zip(path[:-1], path[1:]): topology.adj[u][v]['load'] += matrix.flow[o][d] max_u = max((norm_factor * float(topology.adj[u][v]['load']) \ / float(topology.adj[u][v]['capacity']) for u, v in topology.edges())) if max_u > 1.0: return False return True
[docs]def read_traffic_matrix(path, encoding='utf-8'): """ Parses a traffic matrix from a traffic matrix XML file. If the XML file contains more than one traffic matrix, it returns a TrafficMatrixSequence object, otherwise a TrafficMatrixObject. Parameters ---------- path: str The path of the XML file to parse encoding : str, optional The encoding of the file Returns ------- tm : TrafficMatrix or TrafficMatrixSequence """ def parse_single_matrix(head): """ Parses a single traffic matrix from the XML file """ traffic_matrix = TrafficMatrix() for prop in head.findall('property'): name = prop.attrib['name'] value = util.xml_cast_type(prop.attrib['type'], prop.text) if name == 'volume_unit' and value not in capacity_units: raise ET.ParseError(\ 'Invalid volume_unit property in time node') traffic_matrix.attrib[name] = value for origin in head.findall('origin'): o = util.xml_cast_type(origin.attrib['id.type'], origin.attrib['id']) for destination in origin.findall('destination'): d = util.xml_cast_type(destination.attrib['id.type'], destination.attrib['id']) volume = float(destination.text) traffic_matrix.add_flow(o, d, volume) return traffic_matrix tree = ET.parse(path) head = tree.getroot() matrix_type = head.attrib['type'] if matrix_type == 'single': traffic_matrix = parse_single_matrix(head.find('time')) elif matrix_type == 'sequence': traffic_matrix = TrafficMatrixSequence() for prop in head.findall('property'): name = prop.attrib['name'] value = util.xml_cast_type(prop.attrib['type'], prop.text) traffic_matrix.attrib[name] = value for matrix in head.findall('time'): traffic_matrix.append(parse_single_matrix(matrix)) else: raise ET.ParseError('Invalid TM type attribute in XML file') return traffic_matrix
[docs]def write_traffic_matrix(traffic_matrix, path, encoding='utf-8', prettyprint=True): """ Write a TrafficMatrix or a TrafficMatrixSequence object to an XML file. This function can be use to either persistently store a traffic matrix for later use or to export it to an FNSS adapter for a simulator or an API for another programming language. Parameters ---------- traffic_matrix : TrafficMatrix or TrafficMatrixSequence The traffic matrix to save path : str The path where the file will be saved encoding : str, optional The desired encoding of the output file prettyprint : bool, optional Specify whether the XML file should be written with indentation for improved human readability """ head = ET.Element("traffic-matrix") if isinstance(traffic_matrix, TrafficMatrix): head.attrib['type'] = 'single' matrices = [traffic_matrix] elif isinstance(traffic_matrix, TrafficMatrixSequence): for name, value in traffic_matrix.attrib: prop = ET.SubElement(head, "property") prop.attrib['name'] = str(name) prop.attrib['type'] = util.xml_type(value) prop.text = str(value) head.attrib['type'] = 'sequence' matrices = traffic_matrix.matrix else: raise ValueError('traffic_matrix parameter must be either a ' / 'TrafficMatrix or a TrafficMatrixSequence instance') for matrix in matrices: time = ET.SubElement(head, "time") time.attrib['seq'] = str(matrices.index(matrix)) for name, value in matrix.attrib.items(): prop = ET.SubElement(time, "property") prop.attrib['name'] = str(name) prop.attrib['type'] = util.xml_type(value) prop.text = str(value) for o in matrix.flow: origin = ET.SubElement(time, "origin") origin.attrib['id'] = str(o) origin.attrib['id.type'] = util.xml_type(o) for d in matrix.flow[o]: volume = matrix.flow[o][d] destination = ET.SubElement(origin, "destination") destination.attrib['id'] = str(d) destination.attrib['id.type'] = util.xml_type(d) destination.text = str(volume) if prettyprint: util.xml_indent(head) ET.ElementTree(head).write(path, encoding=encoding)