|
| 1 | +import math |
| 2 | +from collections import deque |
| 3 | + |
| 4 | + |
| 5 | +def _build_adjacency_list(nodes, edges): |
| 6 | + adj = {name: [] for name in nodes} |
| 7 | + for edge in edges: |
| 8 | + adj[edge['from']].append(edge['to']) |
| 9 | + return adj |
| 10 | + |
| 11 | + |
| 12 | +def _topological_sort(graph): |
| 13 | + in_degree = {u: 0 for u in graph} |
| 14 | + for u in graph: |
| 15 | + for v in graph[u]: |
| 16 | + in_degree[v] += 1 |
| 17 | + queue = deque([u for u in graph if in_degree[u] == 0]) |
| 18 | + sorted_order = [] |
| 19 | + while queue: |
| 20 | + u = queue.popleft() |
| 21 | + sorted_order.append(u) |
| 22 | + for v in graph[u]: |
| 23 | + in_degree[v] -= 1 |
| 24 | + if in_degree[v] == 0: queue.append(v) |
| 25 | + if len(sorted_order) == len(graph): |
| 26 | + return sorted_order |
| 27 | + else: |
| 28 | + raise ValueError("Graph contains a cycle.") |
| 29 | + |
| 30 | + |
| 31 | +def _line_intersection(p1, p2, p3, p4): |
| 32 | + def orientation(p, q, r): |
| 33 | + val = (q[1] - p[1]) * (r[0] - q[0]) - (q[0] - p[0]) * (r[1] - q[1]) |
| 34 | + if val == 0: return 0 |
| 35 | + return 1 if val > 0 else 2 |
| 36 | + |
| 37 | + o1, o2, o3, o4 = orientation(p1, p2, p3), orientation(p1, p2, p4), orientation(p3, p4, p1), orientation(p3, p4, p2) |
| 38 | + if o1 != o2 and o3 != o4: return True |
| 39 | + return False |
| 40 | + |
| 41 | + |
| 42 | +class ForceDirectedLayout: |
| 43 | + def __init__(self, nodes, edges, height, node_size=(64, 64), iterations=300, |
| 44 | + k_repel=60000, k_attract=0.3, k_edge_repel=15, ideal_length=120, min_x_spacing=60): |
| 45 | + self._nodes = nodes |
| 46 | + self._edges = edges |
| 47 | + self._node_size = node_size |
| 48 | + self._iterations = iterations |
| 49 | + self._k_repel, self._k_attract, self._k_edge_repel = k_repel, k_attract, k_edge_repel |
| 50 | + self._ideal_length, self._min_x_spacing = ideal_length, min_x_spacing |
| 51 | + |
| 52 | + adj_list = _build_adjacency_list(nodes, edges) |
| 53 | + self._sorted_nodes = _topological_sort(adj_list) |
| 54 | + self._parents = {u: [] for u in nodes} |
| 55 | + for edge in edges: self._parents[edge['to']].append(edge['from']) |
| 56 | + |
| 57 | + self._positions = {} |
| 58 | + ranks = self._calculate_ranks() |
| 59 | + nodes_by_rank = {} |
| 60 | + for node, rank in ranks.items(): |
| 61 | + if rank not in nodes_by_rank: nodes_by_rank[rank] = [] |
| 62 | + nodes_by_rank[rank].append(node) |
| 63 | + for rank, nodes_in_rank in nodes_by_rank.items(): |
| 64 | + x_pos = rank * (self._node_size[0] + self._min_x_spacing + 20) + 70 |
| 65 | + num_in_rank = len(nodes_in_rank) |
| 66 | + if num_in_rank == 1: |
| 67 | + y_positions = [height / 2] |
| 68 | + else: |
| 69 | + total_span = (num_in_rank - 1) * (self._node_size[1] + 50) |
| 70 | + start_offset = -total_span / 2.0 |
| 71 | + y_positions = [height / 2 + start_offset + i * (self._node_size[1] + 50) for i in range(num_in_rank)] |
| 72 | + for i, node in enumerate(nodes_in_rank): |
| 73 | + existing_nodes_position = nodes[node]['position'] if 'position' in nodes[node] else [0, 0] |
| 74 | + if existing_nodes_position == [0 , 0]: |
| 75 | + self._positions[node] = [x_pos, y_positions[i]] |
| 76 | + else: |
| 77 | + self._positions[node] = [existing_nodes_position[0], existing_nodes_position[1]] |
| 78 | + |
| 79 | + def _calculate_ranks(self): |
| 80 | + ranks = {node: 0 for node in self._nodes} |
| 81 | + for node in self._sorted_nodes: |
| 82 | + if self._parents[node]: |
| 83 | + ranks[node] = max(ranks[p] for p in self._parents[node]) + 1 |
| 84 | + return ranks |
| 85 | + |
| 86 | + def max_iterations(self): |
| 87 | + return self._iterations |
| 88 | + |
| 89 | + def positions(self): |
| 90 | + return self._positions |
| 91 | + |
| 92 | + def get_port_position(self, node_name, port_type, port_index): |
| 93 | + center_x, center_y = self._positions[node_name] |
| 94 | + node_width, node_height = self._node_size |
| 95 | + if port_type == 'input': |
| 96 | + num_ports, x_pos = self._nodes[node_name]['inputs'], center_x - node_width / 2 |
| 97 | + else: |
| 98 | + num_ports, x_pos = self._nodes[node_name]['outputs'], center_x + node_width / 2 |
| 99 | + if num_ports <= 1: return [x_pos, center_y] |
| 100 | + vertical_span = node_height * 0.8 |
| 101 | + spacing = vertical_span / (num_ports - 1) |
| 102 | + start_offset = -vertical_span / 2.0 |
| 103 | + y_pos = center_y + start_offset + port_index * spacing |
| 104 | + return [x_pos, y_pos] |
| 105 | + |
| 106 | + def _calculate_forces(self): |
| 107 | + forces = {node: [0.0, 0.0] for node in self._nodes} |
| 108 | + node_list = list(self._nodes.keys()) |
| 109 | + for i in range(len(node_list)): |
| 110 | + for j in range(i + 1, len(node_list)): |
| 111 | + node1, node2 = node_list[i], node_list[j] |
| 112 | + dx, dy = self._positions[node1][0] - self._positions[node2][0], self._positions[node1][1] - self._positions[node2][ |
| 113 | + 1] |
| 114 | + distance = math.sqrt(dx ** 2 + dy ** 2) + 0.0001 |
| 115 | + fy = (dy / distance) * (self._k_repel / (distance ** 2)) |
| 116 | + forces[node1][1] += fy |
| 117 | + forces[node2][1] -= fy |
| 118 | + for edge in self._edges: |
| 119 | + pos1 = self.get_port_position(edge['from'], 'output', edge['from_port']) |
| 120 | + pos2 = self.get_port_position(edge['to'], 'input', edge['to_port']) |
| 121 | + dx, dy = pos1[0] - pos2[0], pos1[1] - pos2[1] |
| 122 | + distance = math.sqrt(dx ** 2 + dy ** 2) + 0.0001 |
| 123 | + fy_attr = (dy / distance) * (self._k_attract * (distance - self._ideal_length)) |
| 124 | + forces[edge['from']][1] -= fy_attr |
| 125 | + forces[edge['to']][1] += fy_attr |
| 126 | + if self._k_edge_repel > 0: |
| 127 | + for i in range(len(self._edges)): |
| 128 | + for j in range(i + 1, len(self._edges)): |
| 129 | + edge1, edge2 = self._edges[i], self._edges[j] |
| 130 | + if len({edge1['from'], edge1['to'], edge2['from'], edge2['to']}) < 4: continue |
| 131 | + e1_p1 = self.get_port_position(edge1['from'], 'output', edge1['from_port']) |
| 132 | + e1_p2 = self.get_port_position(edge1['to'], 'input', edge1['to_port']) |
| 133 | + e2_p1 = self.get_port_position(edge2['from'], 'output', edge2['from_port']) |
| 134 | + e2_p2 = self.get_port_position(edge2['to'], 'input', edge2['to_port']) |
| 135 | + if _line_intersection(e1_p1, e1_p2, e2_p1, e2_p2): |
| 136 | + mid1_y, mid2_y = (e1_p1[1] + e1_p2[1]) / 2, (e2_p1[1] + e2_p2[1]) / 2 |
| 137 | + force_y = self._k_edge_repel if mid1_y < mid2_y else -self._k_edge_repel |
| 138 | + forces[edge1['from']][1] -= force_y |
| 139 | + forces[edge1['to']][1] -= force_y |
| 140 | + forces[edge2['from']][1] += force_y |
| 141 | + forces[edge2['to']][1] += force_y |
| 142 | + return forces |
| 143 | + |
| 144 | + def update_layout(self, damping): |
| 145 | + forces = self._calculate_forces() |
| 146 | + for node in self._sorted_nodes: |
| 147 | + _, fy = forces[node] |
| 148 | + self._positions[node][1] += fy * damping |
| 149 | + max_parent_x = 0 |
| 150 | + if self._parents[node]: |
| 151 | + max_parent_x = max(self._positions[p][0] for p in self._parents[node]) |
| 152 | + self._positions[node][0] = max_parent_x + self._node_size[0] + self._min_x_spacing |
0 commit comments