1 随机游走效率探究

DeepWalk是一种广泛应用于图嵌入(Graph Embedding)领域的算法,自2014年提出以来,其在图嵌入领域占据了重要地位。作为图嵌入领域的奠基性工作,DeepWalk激发了大量后续研究,如Node2Vec、LINE、GraphSAGE等。许多算法在DeepWalk的基础上进行了扩展与改进,例如加入有监督信息、引入更复杂的游走策略等。 其成功应用推动了图神经网络(GNN)等新兴技术的发展和应用。

从每个节点出发进行随机游走是DeepWalk算法的关键步骤,具体来说,每一步从当前节点的邻接矩阵中随机选取一个邻居节点,作为下一个节点,记录如此长度为walk_length的游走序列,将其作为sentence输入到Word2Vec中。

如下是DeepWalk的简化版代码,我在随机游走的基础上加入了重启的选项,在一定程度上可以更好地捕捉局部结构信息、提高对重要节点的关注度,从而增强图嵌入的稳定性。

from networkx.classes.function import nodes
import numpy as np
import os
import networkx as nx
import random
from tqdm import tqdm

from gensim.models import Word2Vec


class deepwalk_RWorRWR:
    # 初始化函数,输入节点数目,边的txt文件地址
    def __init__(self, node_num: int, edge_txt: str, undirected=False):
        print("初始化图结构")
        # 先判断是有向的还是无向的
        if undirected:
            self.G = nx.Graph()
        else:
            self.G = nx.DiGraph()
        # 生成一个输入num数目的节点列表,放在图中
        self.G.add_nodes_from(list(range(node_num)))

        # 节点解决了,开始解决边,我们去定义一个读取str格式的边信息的函数read_edge
        edges = self.read_edge(edge_txt)
        self.G.add_edges_from(edges)

        # 获取图信息
        self.adjacency = np.array(nx.adjacency_matrix(self.G).todense())
        self.G_neighbor = {}
        # 获取每个节点的邻居列表,然后构成一个邻居字典,字典的key就是这个源节点的node
        for i in range(self.adjacency.shape[0]):
            self.G_neighbor[i] = []
            for j in range(self.adjacency.shape[0]):
                if i == j:
                    continue
                if self.adjacency[i, j] > 0.01:
                    self.G_neighbor[i].append(j)


    # 输入一个边的txt文件地址,返回一个元组列表,每一个元组表示一条边
    def read_edge(self, edge_txt):
        # loadtxt非常适合读取由简单分隔符(如空格)分隔的数字
        edges = np.loadtxt(edge_txt, dtype=np.int16)
        # 创建一个元组列表,每一个元组表示一条边
        edges = [(edges[i, 0], edges[i, 1]) for i in range(edges.shape[0])]

        return edges


    # 生成一个 随机游走/重启随机游走 序列
    def RWorRWR(self, path_len, alpha=0, rand_iter=random.Random(628), start=None):
        """_summary_

        Args:
            path_len (_type_): 指定随机游走的长度
            alpha (int, optional): 重启的概率
            rand_iter (_type_, optional): 随机数生成器
            start (_type_, optional): 指定随机游走的起始节点
        """
        # 把初始化函数中的邻居字典给了G
        G = self.G_neighbor

        if start:
            rand_path = [start]
        else:
            # 没指定就随机选一个节点出来,随机概率与节点的任何属性都没有关系
            # G.keys()返回的是一个动态的字典键试图对象,我们给他转成列表
            rand_path = [rand_iter.choice(list(G.keys()))]

        # 开始一次随机游走
        while len(rand_path) < path_len:
            current_pos = rand_path[-1]

            if len(G[current_pos]) > 0:  # 当当前节点有邻节点时
                if rand_iter.random() > alpha:
                    rand_path.append(rand_iter.choice(G[current_pos]))
                else:  # 重启
                    rand_path.append(rand_path[0])
            else:  # 当碰到孤立节点以后
                break

        return [str(node) for node in rand_path]


    # 构建语料库
    def build_corpus(self, num_paths, path_len, alpha=0, rand_iter=random.Random(628)):
        """_summary_

        Args:
            num_paths (_type_): 决定生成多少组随机游走序列,一组包含全部节点的随机游走序列
            path_len (_type_): 指定随机游走的长度
            alpha (int, optional): 重启的概率
            rand_iter (_type_, optional): 随机数生成器
        """
        if alpha:
            print("开始生成重启随机游走语料库")
        else:
            print("开始生成随机游走语料库")

        total_walks = []
        G = self.G_neighbor
        nodes = list(G.keys())

        for i in tqdm(range(num_paths)):
            # 将节点列表打乱
            rand_iter.shuffle(nodes)
            # 分别从每个节点开始进行一次随机游走
            for node in nodes:
                total_walks.append(self.RWorRWR(path_len, alpha, rand_iter, start=node))

        return total_walks


    # 训练word2vec模型
    def train(self, total_walks, embed_size=64, window_size=3, output="."):
        """_summary_

        Args:
            total_walks (_type_): 语料库
            embed_size (int, optional): 词向量维度
            window_size (int, optional): 窗口大小
            output (str, optional): 输出路径
        """
        print("开始训练word2vec模型")
        model = Word2Vec(
            total_walks,
            vector_size=embed_size,
            window=window_size,
            min_count=0,  # 词频阈值,如果一个词的频率小于min_count,那么这个词将被忽略
            sg=1,  # 训练算法的选择,如果sg是0,那么将使用CBOW算法;如果sg是1,那么将使用Skip-gram算法
            hs=1,  # 训练模型时是否使用Hierarchical Softmax,如果hs是0,那么将使用Negative Sampling;如果hs是1,那么将使用Hierarchical Softmax
            workers=8,  # 训练模型时使用的线程数,表示在训练模型时,可以同时运行的线程数
        )

        model.wv.save_word2vec_format(output)


# 进行训练
if __name__ == "__main__":
    # 生成一个有向图
    node_num = 2707
    edge_txt = "dataset/cora_edges.txt"
    G = deepwalk_RWorRWR(node_num, edge_txt)

    # 生成语料库
    num_paths = 10
    path_len = 10
    total_walks = G.build_corpus(num_paths, path_len)
    # 训练word2vec模型
    G.train(total_walks, embed_size=64, window_size=3, output="embedding_result_data/RW/10轮10步64维.txt")
    
    print("训练完成")

当然DeepWalk不是我们本文的重点,正如你所见,本文着重介绍工程方面提升效率的事情,我们将上述代码中随机游走部分的函数拿出来:

# 生成一个 随机游走/重启随机游走 序列
    def RWorRWR(self, path_len, alpha=0, rand_iter=random.Random(628), start=None):
        """_summary_

        Args:
            path_len (_type_): 指定随机游走的长度
            alpha (int, optional): 重启的概率
            rand_iter (_type_, optional): 随机数生成器
            start (_type_, optional): 指定随机游走的起始节点
        """
        # 把初始化函数中的邻居字典给了G
        G = self.G_neighbor

        if start:
            rand_path = [start]
        else:
            # 没指定就随机选一个节点出来,随机概率与节点的任何属性都没有关系
            # G.keys()返回的是一个动态的字典键试图对象,我们给他转成列表
            rand_path = [rand_iter.choice(list(G.keys()))]

        # 开始一次随机游走
        while len(rand_path) < path_len:
            current_pos = rand_path[-1]

            if len(G[current_pos]) > 0:  # 当当前节点有邻节点时
                if rand_iter.random() > alpha:
                    rand_path.append(rand_iter.choice(G[current_pos]))
                else:  # 重启
                    rand_path.append(rand_path[0])
            else:  # 当碰到孤立节点以后
                break

        return [str(node) for node in rand_path]


# 构建语料库
    def build_corpus(self, num_paths, path_len, alpha=0, rand_iter=random.Random(628)):
        """_summary_

        Args:
            num_paths (_type_): 决定生成多少组随机游走序列,一组包含全部节点的随机游走序列
            path_len (_type_): 指定随机游走的长度
            alpha (int, optional): 重启的概率
            rand_iter (_type_, optional): 随机数生成器
        """
        if alpha:
            print("开始生成重启随机游走语料库")
        else:
            print("开始生成随机游走语料库")

        total_walks = []
        G = self.G_neighbor
        nodes = list(G.keys())

        for i in tqdm(range(num_paths)):
            # 将节点列表打乱
            rand_iter.shuffle(nodes)
            # 分别从每个节点开始进行一次随机游走
            for node in nodes:
                total_walks.append(self.RWorRWR(path_len, alpha, rand_iter, start=node))

        return total_walks

这两个函数中,RWorRWR 函数输入一个node,然后生成一条以该node为起点的长度为path_len的游走序列。而build_corpus 则负责最外层的循环,num_paths决定了每个节点循环多少次,然后遍历图中的所有节点,确保每个节点都有从它出发的序列生成。由于已经进行了数据预处理剔除了孤立节点,所以我这里对于孤立节点的判断并不严谨,无伤大雅。

我们以4000个节点的网络为例,游走长度path_len 设置为50,每个节点重复游走次数num_paths设置为50,那么我们进行rand_iter.choice(G[current_pos]) 的次数就为4000*50*50=10,000,000 足足一千万次random.choice

详解Python random.choice(从序列中获取随机元素)函数的使用方法 - Python技术站 (pythonjishu.com)

我们写一个简单的程序进行一千万次,查看所需时间:

import random
import time

# 定义一个包含10个元素的列表
choices = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# 记录开始时间
start_time = time.time()

# 执行一千万次 random.choice
for _ in range(10**7):
    random.choice(choices)

# 记录结束时间
end_time = time.time()

# 计算并打印所用时间
elapsed_time = end_time - start_time
print(f"Time taken for 10 million random.choice operations: {elapsed_time:.2f} seconds")
Time taken for 10 million random.choice operations: 3.30 seconds

可以看到花费大概在秒级,在实际游走时,所需时间大概在一分钟以内,还是比较快的。

2 个性化随机游走效率探究

我们考虑这样一种随机游走策略:

  • 普通随机游走(Random Walk, RW):从一个节点开始,在图中随机选择相邻节点进行游走,通常会根据图的结构均匀地选择下一个节点。

  • 重启随机游走(Random Walk with Restart, RWR):在普通随机游走的基础上,每一步都有一定概率返回到起始节点,这使得游走过程更加关注起始节点周围的局部结构。

  • 个性化随机游走(Personalized Random Walk, PRW):使用特定的亲和矩阵(affinity matrix),在每一步选择下一个节点时,考虑的是节点之间的亲和度,而不是简单的邻接关系。这个亲和矩阵是通过累加转移概率矩阵的不同幂次计算得到的。

简单来说,普通的随机游走每一步从邻居中随机选取一个,但是个性化随机游走每一步会从亲和矩阵中判断每个邻居跟当前节点的亲和度,以此来作为权重进行下一个节点的选择,体现在代码上就是:


    # 计算概率转移矩阵的函数
    def get_transition_matrix(self):
        print("计算概率转移矩阵")
        degrees = np.array(self.adjacency.sum(axis=1)).flatten()  # 计算每个节点的度
        degrees[degrees == 0] = 1  # 将度为0的节点度设置为1,避免除以0,后面不影响计算0/1结果还是0
        D_inv = diags(1.0 / degrees)  # 构建度的逆对角矩阵
        P = D_inv.dot(self.adjacency)  # 计算概率转移矩阵 P = D^{-1} * A
        return torch.tensor(P.toarray(), dtype=torch.float32).to(self.device)  # 转换为PyTorch张量并移动到GPU

    # 计算个性化随机游走的亲和矩阵并归一化
    # 参考公式:https://lovexl-oss.oss-cn-beijing.aliyuncs.com/bed/202407251421774.png
    def personalized_random_walk(self, alpha=0, max_iter=1000):
        print("     计算亲和矩阵,max_iter为", max_iter)
        P = self.transition_matrix
        M = torch.zeros_like(P).to(self.device)
        power_P = torch.eye(P.shape[0], device=self.device)  # 创建一个单位矩阵,P^0 = I
        
        if alpha == 0:
            for k in tqdm(range(max_iter), desc="Calculating affinity matrix (alpha=0)"):
                M += power_P  # 当 alpha = 0 时,M 为所有 P^k 的累加
                power_P = power_P.matmul(P)  # 计算 P^k
        else:
            for k in tqdm(range(max_iter), desc="Calculating affinity matrix"):
                M += alpha * (1 - alpha) ** k * power_P  # 计算亲和矩阵 M
                power_P = power_P.matmul(P)  # 计算 P^k

        # 对亲和矩阵进行归一化
        row_sums = M.sum(dim=1, keepdim=True)
        row_sums[row_sums == 0] = 1  # 避免除以0
        M = M / row_sums
        
        return M

    # 构建语料库的函数
    def build_corpus(self, path_len, alpha=0, max_iter=1000, rand_iter=random.Random(628)):
        print("开始生成个性化随机游走语料库:")
        total_walks = []
        M = self.personalized_random_walk(alpha, max_iter)  # 计算亲和矩阵,不移动到CPU
        M_cpu = M.cpu().numpy()  # 仅一次性转换整个矩阵到CPU
        nodes = list(self.G.nodes)  # 获取所有节点列表
        print("     开始生成随机游走序列")
        for _ in tqdm(range(50), desc="Generating random walks"):
            for start_node in nodes:
                walk = self.random_walk_from_matrix(M_cpu, start_node, path_len, rand_iter)  # 从亲和矩阵中生成随机游走序列
                total_walks.append(walk)
                
        # 打印亲和矩阵的前5行
        print("     亲和矩阵的前5行为:", M[:5])
        # 打印生成的随机游走序列的前5个
        print("     生成的随机游走序列的前5个为:", total_walks[:5])
        return total_walks

    # 从亲和矩阵中生成随机游走序列
    def random_walk_from_matrix(self, M_cpu, start, path_len, rand_iter):
        walk = [start]
        current_node = start
        for _ in range(path_len - 1):
            weights = M_cpu[current_node]
            weights_sum = weights.sum()
            if weights_sum == 0:
                # 如果权重总和为0,说明没有有效的下一个节点,停止游走
                break
            next_node = rand_iter.choices(range(M_cpu.shape[0]), weights=weights, k=1)[0]  # 根据归一化后的权重选择下一个节点
            walk.append(next_node)
            current_node = next_node
        return [str(node) for node in walk]

我们将目光放在本文的效率方面:

  1. 计算亲和矩阵:大小为4000*4000的矩阵做1000次乘法,出乎我所料的是,这个过程异常的迅速,我将其搬迁到了Torch上使用GPU进行运算,整个过程花费不到10秒钟,GPU,yyds!

  2. 进行个性化的随机游走:与方法1的一千万次rand_iter.choice(G[current_pos]) 不同的是,我们进行的是一千万次的rand_iter.choices(range(M_cpu.shape[0]), weights=weights, k=1)[0] ,这两个看似都是在进行random.choice,花费时间却是天差地别。

在下面第三节 3 两种 random.choice 中,通过我们的探究得知,rand_iter.choices(range(M_cpu.shape[0]), weights=weights, k=1)[0] 相较于普通的取随机的方法,计算复杂度要再上一个数量级(n个节点就要算n次权重),由原来的O(1)变为了O(n),所以实际上要做的运算就为4000*50*50*4000=10,000,000*4000 这是一个天文数字,一千万次的时间为3秒的话,那么这个翻了4000倍也足足需要200分钟才可以完成。在实际游走时,时间也确实从半分钟增加到了四十分钟,这是我们不能接受的。

3 两种 random.choice

在Python底层,普通的random.choice和带权重的random.choices在实现上有明显的区别。

3.1 普通 random.choice

random.choice 用于从一个序列(如列表或元组)中随机选择一个元素。其实现相对简单,主要步骤如下:

  1. 计算序列长度:确定序列中元素的数量。

  2. 生成随机索引:使用 random.randrange 生成一个在序列索引范围内的随机整数。

  3. 返回对应元素:根据生成的随机索引返回相应的元素。

伪代码示例:

def choice(sequence):
    length = len(sequence)
    random_index = random.randrange(length)
    return sequence[random_index]

3.2 带权重的 random.choices

random.choices 用于从一个序列中随机选择一个或多个元素,每个元素有不同的权重。其实现相对复杂,主要步骤如下:

  1. 计算累积权重:创建一个累积权重的列表,用于确定每个元素的选择概率区间。

  2. 生成随机数:在累积权重范围内生成一个随机数。

  3. 二分查找:使用二分查找在累积权重列表中定位随机数对应的元素。

  4. 返回对应元素:根据定位结果返回相应的元素。

伪代码示例:

def choices(sequence, weights, k=1):
    # 计算累积权重
    cum_weights = [sum(weights[:i+1]) for i in range(len(weights))]
    total = cum_weights[-1]
    result = []
    
    for _ in range(k):
        # 生成一个随机数
        random_number = random.uniform(0, total)
        # 使用二分查找确定随机数落在的权重区间
        idx = bisect.bisect(cum_weights, random_number)
        result.append(sequence[idx])
    
    return result

3.3 主要区别

复杂度方面

  • random.choice 只需要生成一个随机索引并访问序列对应位置的元素,时间复杂度为O(1)

  • random.choices 需要计算累积权重,并对每次选择进行二分查找,时间复杂度为O(n+k*logn),其中n是序列长度,k是选择次数。

使用场景

  • random.choice 适用于从均匀分布的序列中随机选择元素。

  • random.choices 适用于需要根据不同权重随机选择元素的场景,例如概率抽样、加权随机选择等。

实现细节

  • random.choice 使用了简单的索引和访问操作。

  • random.choices 需要计算累积权重并进行二分查找,因此实现更为复杂。

4 C语言混合编程效率优化

在尝试了Python的多线程、并行任务等效率优化的方法后,结果都不尽人意。譬如设置32个线程来执行任务,虽然每次的速度从十几秒提升到了一瞬间完成,但是消除、创建线程的时间也花费了很多,综合来看提升几乎微乎其微。

由于Python本身就是用C语言来实现的,感兴趣的朋友可以取了解一下CPython。所以我就想到了借助ctypes 来实现C语言与Python的混合编程,用C语言来实现这4000*50*50*4000=10,000,000*4000 次的random.choices ,然后编译生成动态链接库,这样就可以在Python中共享使用了。

以下是我的实现:

1.首先将随机游走序列的生成函数用C语言重写一遍。Python改C,算法思路都一样,其实还是挺容易的。

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

// 生成单次随机游走序列的函数
void random_walk_from_matrix(float* matrix, int* walk, int start, int path_len, int n, unsigned int seed) {
    srand(seed);
    walk[0] = start;
    int current_node = start;

    for (int i = 1; i < path_len; ++i) {
        float weights_sum = 0.0;
        for (int j = 0; j < n; ++j) {
            weights_sum += matrix[current_node * n + j];
        }

        if (weights_sum == 0.0) {
            break;
        }

        float r = ((float) rand() / (float) RAND_MAX) * weights_sum;
        float cumulative_sum = 0.0;
        for (int j = 0; j < n; ++j) {
            cumulative_sum += matrix[current_node * n + j];
            if (cumulative_sum >= r) {
                walk[i] = j;
                current_node = j;
                break;
            }
        }
    }
}

// 生成所有节点随机游走序列的函数
void build_corpus(float* matrix, int* walks, int num_nodes, int path_len, int num_walks, int n) {
    int total_steps = num_walks * num_nodes;
    for (int w = 0; w < num_walks; ++w) {
        for (int start_node = 0; start_node < num_nodes; ++start_node) {
            unsigned int seed = (unsigned int) time(NULL) + w * num_nodes + start_node;
            random_walk_from_matrix(matrix, walks + (w * num_nodes + start_node) * path_len, start_node, path_len, n, seed);

            // 计算进度并显示
            int step = w * num_nodes + start_node + 1;
            int progress = (int) ((float) step / total_steps * 100);
            if (step % (total_steps / 100) == 0) {
                printf("\rProgress: %d%%", progress);
                fflush(stdout);
            }
        }
    }
    printf("\n");
}

2.然后编译该C文件,生成动态链接库。

  • Linux系统下:gcc -shared -o random_walk.so -fPIC random_walk.c

  • Windows系统下:gcc -shared -o random_walk.dll random_walk.c

-fPIC 是一个用于生成位置无关代码(Position Independent Code, PIC)的编译器选项。-fPIC 选项告诉编译器生成位置无关代码,这意味着生成的机器代码不依赖于内存中的特定地址,可以在任意位置加载和执行。这在创建共享库(如 .so 文件或 .dll 文件)时尤其重要,因为共享库可能会被多个程序加载到内存的不同位置。

3.在Python中调用C,替换掉原本的Python函数。

import ctypes

# 加载 C 共享库
lib = ctypes.CDLL('./random_walk.so')

    # 构建语料库的函数
    def build_corpus(self, path_len, alpha=0.15, max_iter=1000, rand_iter=random.Random(628)):
        print("开始生成个性化随机游走语料库:")
        M = self.personalized_random_walk(alpha, max_iter)  # 计算亲和矩阵,不移动到CPU
        M_cpu = M.cpu().numpy()  # 仅一次性转换整个矩阵到CPU
        nodes = list(self.G.nodes)  # 获取所有节点列表
        num_nodes = len(nodes)
        num_walks = 50  # 每个节点进行50次游走

        # 为存储所有随机游走序列创建数组
        total_walks = np.zeros((num_walks * num_nodes, path_len), dtype=np.int32)

        print("     C语言混合编程,开始生成随机游走序列")
        # 调用 C 函数生成随机游走序列
        lib.build_corpus.argtypes = [ctypes.POINTER(ctypes.c_float), ctypes.POINTER(ctypes.c_int), ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.c_int]
        lib.build_corpus.restype = None

        matrix_ptr = M_cpu.ctypes.data_as(ctypes.POINTER(ctypes.c_float))
        walks_ptr = total_walks.ctypes.data_as(ctypes.POINTER(ctypes.c_int))

        start_time = time.time()  # 开始计时
        lib.build_corpus(matrix_ptr, walks_ptr, num_nodes, path_len, num_walks, num_nodes)
        end_time = time.time()  # 结束计时

        # 计算并打印总时长
        duration = end_time - start_time
        print(f"生成所有序列的总时长: {duration:.2f} 秒")

        # 将 numpy 数组转换为列表
        total_walks = total_walks.tolist()
        return total_walks

结果显示,整个游走时间大约在2分钟左右,相较于原本的40分钟,提升了20倍!

感慨一句,还是得C语言啊~