########################################################### # # # This file contains various classes and methods # # needed to handle directed network systems # # # ########################################################### import numpy as np from c_bdd import BDDStructure from c_robdd import ROBDDStructure class DGraphNode: def __init__(self): self.inEdges = [] self.outEdges = [] self.marked = False self.tempID = -1 self.state = 1 # 1 = FUNCTIONING, 0 = FAILED def addInEdge(self, e): if (self.inEdges.count(e) == 0): self.inEdges.append(e) def removeInEdge(self, e): if (self.inEdges.count(e) > 0): self.inEdges.remove(e) def addOutEdge(self, e): if (self.outEdges.count(e) == 0): self.outEdges.append(e) def removeOutEdge(self, e): if (self.outEdges.count(e) > 0): self.outEdges.remove(e) def removeAllInEgdes(self): self.inEdges.clear() def removeAllOutEgdes(self): self.outEdges.clear() def removeAllEgdes(self): self.inEdges.clear() self.outEdges.clear() def inEdgeCount(self): return len(self.inEdges) def outEdgeCount(self): return len(self.outEdges) def edgeCount(self): return len(self.inEdges) + len(self.outEdges) def mergeNode(self, n): c1 = n.inEdgeCount() for j in range(c1): e = n.inEdges[j] self.addInEdge(e) e.changeOutNode(self) n.removeAllInEgdes() c2 = n.outEdgeCount() for j in range(c2): e = n.outEdges[j] self.addOutEdge(e) e.changeInNode(self) n.removeAllOutEgdes() def isMarked(self): return self.marked def mark(self): self.marked = True def unmark(self): self.marked = False def setTempID(self, i): self.tempID = i def getTempID(self): return self.tempID def setState(self, s): self.state = s def getState(self): return self.state def isFunctioning(self): return (self.state == 1) def isFailed(self): return (self.state == 0) def markByState(self): if self.isFailed() or self.isMarked(): return self.mark() for edge in self.outEdges: edge.markByState() class DGraphEdge: def __init__(self, n1, n2): self.inNode = n1 self.outNode = n2 # We allow loops, i.e., n1 may be equal to n2 self.marked = False self.tempID = -1 self.state = 1 # 1 = FUNCTIONING, 0 = FAILED def changeInNode(self, newInNode): self.inNode = newInNode def changeOutNode(self, newOutNode): self.outNode = newOutNode def isMarked(self): return self.marked def mark(self): self.marked = True def unmark(self): self.marked = False def setTempID(self, i): self.tempID = i def getTempID(self): return self.tempID def setState(self, s): self.state = s def getState(self): return self.state def isFunctioning(self): return (self.state == 1) def isFailed(self): return (self.state == 0) def markByState(self): if self.isFailed() or self.isMarked(): return self.mark() self.outNode.markByState() class DGraph: def __init__(self): self.nodes = [] self.edges = [] self.terminals = [] def addNode(self): node = DGraphNode() self.nodes.append(node) def addEdge(self, i1, i2): node1 = self.nodes[i1] node2 = self.nodes[i2] edge = DGraphEdge(node1, node2) node1.addOutEdge(edge) node2.addInEdge(edge) self.edges.append(edge) def makeGraph(self, matrix): m,n = matrix.shape for _ in range(m): self.addNode() for j in range(n): i1 = -1 i2 = -1 for ii in range(m): if matrix[ii,j] == -1: i1 = ii elif matrix[ii,j] == 1: i2 = ii if (i1 >= 0) and (i2 >= 0): self.addEdge(i1, i2) def makeTerminals(self, tlist): for i in tlist: self.terminals.append(self.nodes[i]) def isSource(self, node): return node == self.terminals[0] def isTerminal(self, node): return (self.terminals.count(node) > 0) def getIncidenceMatrix(self): m = len(self.nodes) for i in range(m): self.nodes[i].setTempID(i) n = len(self.edges) matrix = np.zeros((m,n), dtype=int) for j in range(n): edge = self.edges[j] r1 = edge.inNode.getTempID() r2 = edge.outNode.getTempID() matrix[r1, j] = -1 matrix[r2, j] = 1 return matrix def getSourceID(self): return 0 def getTerminalList(self): m = len(self.nodes) for i in range(1, m): self.nodes[i].setTempID(i) t = len(self.terminals) tlist = [] for i in range(t): tlist.append(self.terminals[i].getTempID()) return tlist def restriction(self, i = 0): edge = self.edges[i] self.edges.remove(edge) node1 = edge.inNode node2 = edge.outNode node1.removeOutEdge(edge) node2.removeInEdge(edge) def contraction(self, i = 0): edge = self.edges[i] self.edges.remove(edge) node1 = edge.inNode node2 = edge.outNode if (node1 == node2) or self.isSource(node2): node1.removeInEdge(edge) node1.removeOutEdge(edge) else: node1.removeOutEdge(edge) node2.removeInEdge(edge) node1.mergeNode(node2) self.nodes.remove(node2) if self.isTerminal(node2): self.terminals.remove(node2) if not self.isTerminal(node1): self.terminals.append(node1) def unmarkAll(self): for node in self.nodes: node.unmark() for edge in self.edges: edge.unmark() def setNodeState(self, i, s): self.nodes[i].setState(s) def getNodeState(self, i): return self.nodes[i].getState() def setEdgeState(self, i, s): self.edges[i].setState(s) def getEdgeState(self, i): return self.edges[i].getState() def computeState(self): self.unmarkAll() self.terminals[0].markByState() state = 1 for terminal in self.terminals: if not terminal.isMarked(): state = 0 # If at least one terminal is not marked, the system is failed break return state # If all terminals are marked, the system is functioning class BDDDGraph(BDDStructure): def __init__(self, matrix, tlist): super().__init__(matrix.shape[1]) self.incidenceMatrix = matrix self.terminalList = tlist self.graph = DGraph() self.graph.makeGraph(self.incidenceMatrix) self.graph.makeTerminals(tlist) def selectPivotIndex(self): for i in range(self.order): if self.incidenceMatrix[0, i] == -1: # Find the first outgoing edge from the source node self.pivotIndex = i # This pivot index will ensure that the contraction is break # a digraph system. Since we at this stage know that the # system is not failed, there must exist such an edge def restriction(self): g = DGraph() g.makeGraph(self.incidenceMatrix) g.makeTerminals(self.terminalList) g.restriction(self.pivotIndex) matrix = g.getIncidenceMatrix() tlist = g.getTerminalList() struct = BDDDGraph(matrix, tlist) for i in range(self.order): if i != self.pivotIndex: struct.addComponent(self.components[i]) return struct def contraction(self): g = DGraph() g.makeGraph(self.incidenceMatrix) g.makeTerminals(self.terminalList) g.contraction(self.pivotIndex) matrix = g.getIncidenceMatrix() tlist = g.getTerminalList() struct = BDDDGraph(matrix, tlist) for i in range(self.order): if i != self.pivotIndex: struct.addComponent(self.components[i]) return struct def isFunctioning(self): t = len(self.terminalList) return (t <= 1) def isFailed(self): g = DGraph() g.makeGraph(self.incidenceMatrix) g.makeTerminals(self.terminalList) return (g.computeState() == 0) class ROBDDDGraph(ROBDDStructure): def __init__(self, matrix, tlist): super().__init__(matrix.shape[1]) self.incidenceMatrix = matrix self.terminalList = tlist self.graph = DGraph() self.graph.makeGraph(self.incidenceMatrix) self.graph.makeTerminals(tlist) def restriction(self): g = DGraph() g.makeGraph(self.incidenceMatrix) g.makeTerminals(self.terminalList) g.restriction() matrix = g.getIncidenceMatrix() tlist = g.getTerminalList() return ROBDDDGraph(matrix, tlist) def contraction(self): g = DGraph() g.makeGraph(self.incidenceMatrix) g.makeTerminals(self.terminalList) g.contraction() matrix = g.getIncidenceMatrix() tlist = g.getTerminalList() return ROBDDDGraph(matrix, tlist) def isFunctioning(self): t = len(self.terminalList) return (t <= 1) def isFailed(self): g = DGraph() g.makeGraph(self.incidenceMatrix) g.makeTerminals(self.terminalList) return (g.computeState() == 0) def isEquivalentTo(self, s): m1,n1 = self.incidenceMatrix.shape m2,n2 = s.incidenceMatrix.shape t1 = len(self.terminalList) t2 = len(s.terminalList) # First, we check if the dimensions are equal if (m1 != m2) or (n1 != n2) or (t1 != t2): return False # Next, we check if the incidenceMatrices are equal matrixFlag = True for i in range(m1): for j in range(n1): if self.incidenceMatrix[i,j] != s.incidenceMatrix[i,j]: matrixFlag = False break if not matrixFlag: break if not matrixFlag: return False # Finally, we check if the terminalLists are equal terminalFlag = True for i in range(t1): if self.terminalList[i] != s.terminalList[i]: terminalFlag = False break return terminalFlag