Python中并行搜索图的几种算法代码

一个使用图进行并行编程任务的简单示例。

给定一个有 N 个节点、M 条边和一个源节点 S 的有向加权图,使用并行编程找出从源节点 S 到图中所有其他节点的最短距离。

要使用并行编程并行计算加权有向图中的最短距离,可以使用并行版的 Dijkstra 算法。

Dijkstra 算法是一种著名的查找图中节点间最短路径的算法。该算法维护一组已知与源点最短距离的顶点,并反复选择距离最小的顶点,放宽其相邻顶点。

Dijkstra 算法和广度优先搜索(BFS)都是用于遍历和探索图的算法,但它们的用途不同,具有不同的特点。

Dijkstra 算法

  1. 目的:
    • Dijkstra 算法主要用于查找加权图中源节点与所有其他节点之间的最短路径。
    • 它适用于具有非负边权重的图。
  • 边权重:
    • 它需要非负边权重。
    • Dijkstra 算法维护一个优先级队列或最小堆来选择每一步距离最小的节点。
  • 最优性:
    • Dijkstra算法保证最优性;一旦节点的最短路径确定,以后就不会更新。
  • 遍历策略:
    • 它使用贪心策略,总是选择尝试距离最小的节点。

    广度优先搜索(BFS):

    1. 目的:
      • BFS 用于在图中进行遍历或搜索。
      • 它不考虑边权重;它通常用于未加权或均匀加权图。
  • 边权重:
    • BFS 适用于未加权或等权的图。
  • 最优性:
    • BFS 不一定能找到最短路径,尤其是在存在加权边的情况下。
  • 遍历策略:
    • BFS 在移动到下一级节点之前会探索节点的所有邻居。

    比较:

    • 使用案例:
      • 当您需要在具有非负权重的图中查找最短路径时,请使用 Dijkstra 算法。
      • 当您需要探索或遍历图时,尤其是在没有边权重或所有边权重相等的情况下,请使用 BFS。
    • 边权重:
      • Dijkstra 算法考虑了边权重,适用于加权图。
      • BFS 与权重无关,通常用于未加权或等权重图。
    • 最优性:
      • Dijkstra 算法保证了寻找最短路径的最优性。
      • BFS不一定能找到最短路径;它根据边数找到最短路径。
    • 数据结构:
      • Dijkstra算法通常需要优先级队列或最小堆来有效地选择具有最小距离的节点。
      • BFS通常使用队列进行遍历。

    Dijkstra 算法是专门为寻找加权图中的最短路径而设计的,而 BFS 是一种更通用的算法,用于在不考虑边权重的情况下探索或遍历图。

    Dijkstra 算法Python代码

    import networkx as nx
    from concurrent.futures import ProcessPoolExecutor

    def parallel_dijkstra(graph, source):
        num_nodes = graph.number_of_nodes()

       # 初始化距离和访问集
        distances = {node: float('inf') for node in graph.nodes}
        distances[source] = 0
        visited = set()

        with ProcessPoolExecutor(max_workers=num_nodes) as executor:
            while len(visited) < num_nodes:
               # 查找尚未访问的距离最小的节点
                candidates = [(node, distances[node]) for node in graph.nodes if node not in visited]
                current_node, min_distance = min(candidates, key=lambda x: x[1])

               # 将当前节点标记为已访问
                visited.add(current_node)

               # 同步放松邻居
                neighbors = list(graph.neighbors(current_node))
                with executor.map(process_neighbor, [(current_node, neighbor) for neighbor in neighbors]) as results:
                    for neighbor, weight in results:
                        new_distance = distances[current_node] + weight
                        if new_distance < distances[neighbor]:
                            distances[neighbor] = new_distance

        return distances

    def process_neighbor(args):
        # Simulate some computation on the edge weight
        # In a real-world scenario, this function would perform more meaningful work
        current_node, neighbor = args
        return neighbor, graph[current_node][neighbor]['weight']

    if __name__ == "__main__":
       # 生成随机加权有向图
        graph = nx.gn_graph(10, seed=42, directed=True)
        for edge in graph.edges:
            graph[edge[0]][edge[1]]['weight'] = graph[edge[0]][edge[1]]['weight'] + 1

       # 为 Dijkstra 算法设置源节点
        source_node = 0

       # 执行并行 Dijkstra 算法
        result = parallel_dijkstra(graph, source_node)

        print(
    "Shortest Distances:", result)


    在本示例中,parallel_dijkstra 函数通过使用 ProcessPoolExecutor 并发地放松当前节点的邻居,从而并行执行 Dijkstra 算法。process_neighbor 函数模拟了边权重的一些计算,在实际场景中,该函数将执行与图的边权重相关的更有意义的工作。

    请注意,并行化带来的实际速度提升取决于图的大小、CPU 可用内核数量以及相关计算的复杂性等因素。此外,本示例假定边权重为正值;对于负权重图或需要额外同步的情况,可能需要进行调整。

    并行化Dijkstra 算法C++代码
    这一问题通常使用 Dijkstra 算法顺序解决。然而,我们的任务是有效地并行化这一算法,以利用并行计算的能力。以下是上述方法的实现

    include <bits/stdc++.h>
    using namespace std;

    const int INF = 1e9;

    int V;
    vector<vector<pair<int, int> > > adj;

    void addEdge(int u, int v, int w)
    {
        adj[u].push_back({ v, w });
    }

    void sequentialDijkstra(int src, vector<int>& dist)
    {
        // 将所有距离初始化为无限远
        dist.assign(V, INF);
        dist[src] = 0;

        
    // 创建一个集合,用于存储具有最小
        
    //距离的顶点
        vector<bool> processed(V, false);

        
    // record the start time
        auto start_time = chrono::high_resolution_clock::now();

        
    // 查找所有顶点的最短路径
        for (int count = 0; count < V - 1; ++count) {
            int u = -1;
            for (int i = 0; i < V; ++i) {
                if (!processed[i]
                    && (u == -1 || dist[i] < dist[u]))
                    u = i;
            }

            
    //将选中的顶点标记为已处理
            processed[u] = true;

            
    // 更新
           
    // 选取顶点的相邻顶点的距离值。
            for (const auto& edge : adj[u]) {
                int v = edge.first;
                int w = edge.second;
                if (!processed[v] && dist[u] != INF
                    && dist[u] + w < dist[v]) {
                    dist[v] = dist[u] + w;
                }
            }
        }

        
    // record the end time
        auto end_time = chrono::high_resolution_clock::now();
        auto duration
            = chrono::duration_cast<chrono::microseconds>(
                end_time - start_time);

        
    // Print the sequential execution time
        cout <<
    "Sequential Dijkstra Execution Time: "
            << duration.count() <<
    " microseconds" << endl;
    }

    void parallelDijkstra(int src, vector<int>& dist)
    {
        dist.assign(V, INF);
        dist[src] = 0;

        
    // Create a set to store vertices with the minimum
        
    // distance
        vector<bool> processed(V, false);

        
    // 记录开始时间
        auto start_time = chrono::high_resolution_clock::now();

        
    // Find shortest path for all vertices
        for (int count = 0; count < V - 1; ++count) {
            int u = -1;
    pragma omp parallel for
            for (int i = 0; i < V; ++i) {
                if (!processed[i]
                    && (u == -1 || dist[i] < dist[u]))
    pragma omp critical
                    u = (u == -1 || dist[i] < dist[u]) ? i : u;
            }

            
    // Mark the picked vertex as processed
            processed[u] = true;

            
    // 使用并行编程更新
           
    // 选取顶点的相邻顶点的距离值
    pragma omp parallel for
            for (const auto& edge : adj[u]) {
                int v = edge.first;
                int w = edge.second;
                if (!processed[v] && dist[u] != INF
                    && dist[u] + w < dist[v]) {
    pragma omp critical
                    if (dist[u] != INF && dist[u] + w < dist[v])
                        dist[v] = dist[u] + w;
                }
            }
        }

        
    // Record the end time
        auto end_time = chrono::high_resolution_clock::now();
        auto duration
            = chrono::duration_cast<chrono::microseconds>(
                end_time - start_time);

        
    // Print the parallel execution time
        cout <<
    "Parallel Dijkstra Execution Time: "
            << duration.count() <<
    " microseconds"
            <<
    "\n";
    }

    int main()
    {
        int S = 0;
        V = 5;
        adj.resize(V);
        addEdge(0, 1, 2);
        addEdge(0, 2, 4);
        addEdge(1, 2, 1);
        addEdge(1, 3, 7);
        addEdge(2, 3, 3);
        addEdge(3, 4, 5);
        addEdge(3, 4, 2);
        addEdge(4, 0, 6);

        vector<int> dist(V, 1e9);

        sequentialDijkstra(S, dist);
        parallelDijkstra(S, dist);
        cout <<
    "Shortest distances from Vertex " << S << ":\n";
        for (int i = 0; i < V; ++i) {
            cout <<
    "Vertex " << i << ": " << dist[i];
            if (i != V - 1)
                cout <<
    ", ";
        }

        return 0;
    }
    输出
    Output
    Sequential Dijkstra Execution Time: 2 microseconds
    Parallel Dijkstra Execution Time: 1 microseconds
    Shortest distances from Vertex 0:
    Vertex 0: 0, Vertex 1: 2, Vertex 2: 3, Vertex 3: 6, Vertex 4: 8

    图的并行执行与顺序执行之间的时间差为 1 微秒。

    广度优先搜索(BFS)算法
    让我们来看一个经典的例子:在图上并行化广度优先搜索(BFS)算法。BFS 用于从给定的源顶点开始逐级遍历图。

    在本例中,我将使用 Python 和 concurrent.futures 模块进行并行化。为了演示的目的,将使用 networkx 库生成随机图:
    pip install networkx


    现在,让我们创建一个简单的并行 BFS 实现:

    import networkx as nx
    from concurrent.futures import ProcessPoolExecutor
    from collections import deque

    def parallel_bfs(graph, source, num_workers):
        visited = set()
        level = {source: 0}
        queue = deque([source])

        with ProcessPoolExecutor(max_workers=num_workers) as executor:
            while queue:
                current_node = queue.popleft()
                if current_node not in visited:
                    visited.add(current_node)
                    neighbors = list(graph.neighbors(current_node))
                    with executor.map(process_neighbor, neighbors) as results:
                        for neighbor, distance in results:
                            if neighbor not in level:
                                level[neighbor] = level[current_node] + distance
                                queue.append(neighbor)

        return level

    def process_neighbor(neighbor):
       # 模拟对邻居进行一些计算
        # 在现实世界中,该函数将执行更有意义的工作
        return neighbor, 1

    if __name__ == "__main__":
       # 生成随机图形
        graph = nx.erdos_renyi_graph(100, 0.1)

        # 设置并行工作者的数量
        num_workers = 4

       # 为 BFS 指定源节点
        source_node = 0

       # 执行并行 BFS
        result = parallel_bfs(graph, source_node, num_workers)

        print(
    "BFS Levels:", result)


    在本例中,parallel_bfs 函数使用 ProcessPoolExecutor 来并行处理 BFS 遍历。BFS 算法会并行处理邻居,而 process_neighbor 函数会模拟对每个邻居的计算。计算结果是一个字典,其中包含每个节点到源节点的距离。

    需要注意的是,由于创建进程的开销,以这种方式并行处理 BFS 并不总能显著提高小型图的速度。但是,对于较大的图,并行化可以提高性能。此外,在实际场景中,process_neighbor 函数会对每个邻居执行更有意义的计算。

    单源最短路径 (SSSP) 算法
    Dijkstra 算法是单源最短路径 (SSSP) 算法的一个特定实例。术语“单源最短路径”是指更广泛的算法类别,用于查找从单个源节点到图中所有其他节点的最短路径。Dijkstra 算法只是 SSSP 算法的一个示例。

    Dijkstra 算法是一种特定的 SSSP 算法,旨在查找具有非负边权的图中从源节点到所有其他节点的最短路径。

    1. 边权重:
      • Dijkstra 算法适用于具有非负边权重的图。
      • 它维护一个优先级队列或一个最小堆来选择每一步距离最小的节点。
  • 最优性:
    • Dijkstra算法保证最优性;一旦节点的最短路径确定,以后就不会更新。

    单源最短路径 (SSSP) 算法:

    1. 目的:
      • 一般来说,SSSP 算法的目标是找到图中从单个源节点到所有其他节点的最短路径。
  • 种类:
    • SSSP 范畴内有多种算法,它们可能具有不同的要求和特征。
    • SSSP 算法的其他示例包括 Bellman-Ford 算法(处理负边权重,但收敛速度可能较慢)、A* 算法(基于启发式)等。
  • 边权重:
    • 根据具体算法,SSSP 算法可以处理具有各种类型边权重的图,包括负权重(在 Bellman-Ford 的情况下)。
  • 最优性:
    • SSSP算法的最优性取决于具体算法。有些算法(例如 Dijkstra 算法)可以保证在某些条件下的最优性。

    比较:

    • 使用案例:
      • Dijkstra 算法是 SSSP 在处理非负边权重时的具体选择。
      • 可以根据特定要求选择其他 SSSP 算法,例如处理负权重或合并启发式算法。
    • 边权重:
      • Dijkstra 算法假设边缘权重非负。
      • 一般来说,SSSP 算法可以根据其设计处理各种类型的边权重。
    • 最优性:
      • Dijkstra算法保证了边权非负条件下的最优性。
      • 其他 SSSP 算法可能会也可能不会保证最优性,具体取决于它们的设计和它们处理的图属性。

    下面是使用三角步法实现的并行单源最短路径(SSSP)算法:

    from concurrent.futures import ThreadPoolExecutor

    def delta_stepping(edges, source, workers):
      """
      Finds the shortest distance from a source node to all other nodes in a directed graph using Delta Stepping.

      Args:
        edges: A list of edges represented as tuples (source, destination, weight).
        source: The source node.
        workers: The number of parallel workers.

      Returns:
        distances: A dictionary mapping each node to its shortest distance from the source.
     
    """
      distances = {node: float('inf') for node in range(len(edges))}
      distances[source] = 0

      # Function to update distances for a single node
      def update_node(node):
        for dest, weight in edges[node]:
          distances[dest] = min(distances[dest], distances[node] + weight)

      with ThreadPoolExecutor(max_workers=workers) as executor:
        # Parallel update of distances
        for _ in range(len(edges)):
          executor.map(update_node, range(len(edges)))

      return distances

    # Example usage
    edges = [
      (0, 1, 10),
      (0, 2, 3),
      (1, 3, 2),
      (2, 3, 1),
      (2, 4, 5),
      (3, 4, 2),
    ]
    source = 0
    workers = 4

    distances = delta_stepping(edges, source, workers)

    print(
    "Shortest distances from source:", distances)

    该代码实现了以下步骤:

    • 初始化 dictionary distances,以存储从源点到所有节点的最短距离。
    • 将源节点的距离设为 0,其他节点的距离设为无穷大。
    • 定义一个函数 update_node,用于更新从给定节点可到达的所有节点的距离。
    • 利用包含工人线程的 ThreadPoolExecutor 来并行更新距离。
    • 遍历边,并为每个节点并行调用 update_node,确保每条边只被处理一次。
    • 最后,字典距离包含从源点到所有其他节点的最短路径。

    这是使用三角步法实现并行 SSSP 的基本方法。您可以通过以下方法对其进行进一步优化
    • 利用 NetworkX 等图形库进行高效图形处理。
    • 实施工作偷取方法,处理工作者之间的工作量不平衡问题。
    • 根据需要适应不同的并行编程范式,如 CUDA 或 OpenMP。