菜单
本页目录

威胁增强

1.1 attack_lab

synthesize_attack

def synthesize_attack(logins, attack_config, start_dt=None):
    """从头合成攻击的方法。

    参数:
        logins: pd.DataFrame : 每行一个登录事件
        attack_config: attack_lib.AttackPathConfig 对象
        start_dt: datetime.datetime 最早使用的登录时间
    返回:
        pd.DataFrame 合成的攻击登录事件
    """
    # 确保登录事件有适当的列/字段
    columns = logins.columns
    assert(
        LoginColumns.TIME in columns and
        LoginColumns.SRC in columns and
        LoginColumns.USER in columns and
        LoginColumns.DST in columns and
        LoginColumns.DATASET in columns
    )

    # 根据登录类型限制数据集
    if attack_config.protocol == 'ssh':
        data = logins[
            logins[LoginColumns.DATASET].str.lower().str.contains('ssh')]
    elif attack_config.protocol == 'windows':
        data = logins[
            logins[LoginColumns.DATASET].str.lower().str.contains('windows')]
    else:
        data = logins
    print("使用 {} {} 登录事件进行攻击合成\n".format(
        len(data), attack_config.protocol))

    # 生成攻击路径
    attack_generator = AttackPathGenerator(
        attack_config.attack_goal,
        start_state=attack_config.start_state,
        attacker_knowledge=attack_config.attacker_knowledge,
        stealth=attack_config.attack_stealth,
        src_preference=attack_config.src_preference,
        src_history_hrs=attack_config.src_history_hrs,
        compromise_cred_hrs=attack_config.compromise_cred_hrs,
        active_cred_hrs=attack_config.active_cred_hrs,
        target_machines=attack_config.target_machines,
        start_dt=start_dt
    )
    attack = attack_generator.make_attack(data)

    return attack

此函数用于从头开始合成攻击路径。

它接收一个包含登录事件的DataFrame和一个攻击配置对象,返回一个包含合成攻击路径的DataFrame。

is_synthetic_attack_successful

def is_synthetic_attack_successful(attack_df):
    """如果攻击未能切换到新的用户凭据,则攻击不成功。"""
    if attack_df is None or len(attack_df) == 0:
        return False
    users = attack_df[LoginColumns.USER].drop_duplicates()
    return len(users) >= 2

此函数用于判断合成攻击是否成功。如果攻击路径中未能切换到新的用户凭据,则认为攻击不成功。

AttackPathGenerator 类

初始化方法

class AttackPathGenerator(LoggingClass):
    """生成不同类型的横向移动路径。"""
    VERSION = 0

    def __init__(
        self, attack_goal, attacker_knowledge, stealth,
        start_state=None, compromise_cred_hrs=None,
        active_cred_hrs=None, src_history_hrs=None, src_preference=None,
        target_machines=set([]), verbose=True, start_dt=None
    ):
        """初始化攻击路径生成器。

        参数:
            attack_goal: MovementGoal 中的目标常量
            attacker_knowledge: AttackerCapabilities 中的知识常量
            stealth: MovementStealth 中的隐蔽性常量
            state_state: utils.AttackStart 对象
            compromise_cred_hrs: (int) 凭据在最后一次用户登录后仍然可被利用的小时数
            active_cred_hrs: (int) 检测器用于因果链接两次登录事件的小时数,适用于隐蔽攻击
            src_history_hrs: (int) 攻击者可以挖掘的机器最近登录历史的小时数
            src_preference: MovementConstraints 中的 SRC_PREF 常量
            target_machines: 目标攻击的高价值主机名集合(字符串)
        """
        super(AttackPathGenerator, self).__init__(verbose=verbose)
        self.real_users = get_all_users()

        # 随机采样间隔时间(秒)
        # 在 [0, self.interarrival_window_hrs] 之间,为下一次攻击跳跃设置时间间隔
        self.interarrival_window_hrs = 2

        self.start_dt = start_dt
        self.attack_start = start_state
        self.attack_history = None
        self.attack_capabilities = AttackerCapabilities(
            attacker_knowledge, compromise_cred_hrs)
        self.attack_constraints = MovementConstraints(src_preference)
        self.attack_stealth = MovementStealth(
            stealth, active_cred_hrs=active_cred_hrs,
            src_history_hrs=src_history_hrs)
        self.attack_goal = MovementGoal(
            attack_goal, target_machines=target_machines)

该类用于生成不同类型的横向移动攻击路径。

初始化方法设置了攻击的目标、攻击者的知识、隐蔽性、起始状态、凭据暴露时间、活动凭据时间、源历史时间、源偏好以及目标机器等。

make_attack 方法

    def make_attack(self, logins):
        """生成攻击登录事件的主方法。"""
        logins = self._preprocess_logins(logins)

        # 初始化起始跳跃
        self._initialize_start(logins)

        # 迭代生成下一个跳跃,直到达到攻击目标,或攻击失去选择
        while not self.attack_goal.is_attack_complete(self.attack_history):
            next_time, next_src, next_dst, next_user = self._get_next_hop(logins)

            if next_dst is None:
                # 如果没有更多的机器可以移动到,则终止
                self.log("\n警告:攻击已失去潜在目标。终止!!!\n")
                break
            else:
                # 打印进度信息
                self.log("选择下一个跳跃:"
                         "(下一跳时间 = {}, 源 = {}, 目标 = {}, 用户 = {}。".format(
                    next_time, next_src, next_dst, next_user
                ))

            # 合成一个新的登录事件,给定
            # (1) 起始机器,(2) 用户,(3) 目标机器
            new_hop = LoginSynthesizer().synthesize_login(
                logins, next_time, next_src, next_dst, next_user)
            new_hop.loc[:, LoginColumns.ATTACK] = True

            # 进行横向移动到新的目标并更新状态
            self._make_next_hop(new_hop, logins)

        return self.attack_history.attack_path.reset_index(drop=True)

此方法是生成攻击登录事件的主方法。它先预处理登录事件,然后初始化起始跳跃,接着迭代生成下一个跳跃,直到达到攻击目标或攻击失去选择。

_preprocess_logins 方法

    def _preprocess_logins(self, logins):
        """辅助方法:预处理登录事件以生成攻击。"""
        logins = logins[
            (logins[LoginColumns.TIME] >= self.start_dt)
        ]

        # 过滤只包含真实用户的登录事件
        logins = logins[logins[LoginColumns.USER].isin(self.real_users)]

        # 过滤掉来自忽略集机器的登录事件

        # 添加你想应用的其他过滤

        return logins

此方法预处理登录事件以生成攻击。它过滤出开始时间之后的登录事件,并只保留包含真实用户的登录事件。

_engineer_attack_start 方法

    def _engineer_attack_start(self, logins):
        """根据隐蔽性工程攻击起点以确保成功。"""
        print("工程攻击起点。\n")

        i = 0
        while self.attack_start is None and i < 100:
            i += 1
            try:
                # 根据指定的攻击类型工程攻击起始状态
                if self.attack_stealth.stealth == MovementStealth.STEALTH_ACTIVE_CREDS:
                    self.attack_start = AttackStart(AttackStart.START_AMBIG_PATH)
                elif self.attack_stealth.stealth == MovementStealth.STEALTH_ENDPOINTS:
                    self.attack_start = AttackStart(AttackStart.START_LONG_PATH)
                elif self.attack_stealth.stealth == MovementStealth.STEALTH_FULL:
                    self.attack_start = AttackStart(AttackStart.START_STEALTH_PATH)
                else:
                    self.attack_start = AttackStart()
                self.attack_start.initialize(logins)
            except:
                self.attack_start = None
                print("攻击起点生成失败:{}".format(i))

此方法根据攻击的隐蔽性来工程攻击起点,以确保攻击成功。它尝试不同的起始状态,直到找到合适的起点或尝试次数达到上限。

_initialize_start 方法

    def _initialize_start(self, logins):
        """辅助方法:选择初始妥协起点和时间。"""
        # 初始化攻击起点
        if self.attack_start is None:
            i = 0
            print("随机化攻击起点。\n")
            self.attack_start = AttackStart()
            self.attack_start.initialize(logins)
        else:
            print("预先指定的起始状态:{}\t{}\t{}".format(
                self.attack_start.start_time, self.attack_start.start_src,


                self.attack_start.start_user
            ))

        # 初始化起始状态以填补任何未指定的值
        self.attack_start.initialize(logins)

        # 初始化目标
        self.attack_goal.target_info.initialize(
            self.attack_start.start_src, self.attack_start.start_time, logins)

        # 填充攻击历史
        self.attack_history = AttackHistory(self.attack_start)

        # 初始化攻击者的能力
        compromised_users = set([self.attack_start.start_user,])
        foothold = self.attack_start.start_src
        self.attack_capabilities.initialize_capabilities(self.attack_start, logins)
        self.attack_stealth.update_knowledge(
            self.attack_start.start_time, foothold, logins)
        self.attack_history.update_compromised_creds(foothold, compromised_users)
        self.attack_goal.update_progress(foothold, compromised_users)

此方法选择初始妥协起点和时间。它初始化攻击起点、目标、攻击历史和攻击者的能力。

_make_next_hop 方法

    def _make_next_hop(self, new_hop, logins):
        """辅助方法:基于攻击移动到下一跳更新状态。"""
        self.log("移动到新的攻击边:")
        self.log(list(new_hop[LOGIN_ANALYSIS_COLUMNS].itertuples(index=False))[0])

        new_time = new_hop[LoginColumns.TIME].iloc[0]
        new_dst = new_hop[LoginColumns.DST].iloc[0]

        compromised_users = self.attack_capabilities.update_capabilities(
            new_time, new_dst, logins)
        self.attack_stealth.update_knowledge(new_time, new_dst, logins)
        self.attack_history.add_new_hop(new_hop)
        self.attack_history.update_compromised_creds(new_dst, compromised_users)
        self.attack_goal.update_progress(new_dst, compromised_users)

此方法基于攻击移动到下一跳更新状态。它更新攻击者的能力、知识、攻击历史和攻击目标的进展。

_get_next_hop 方法

    def _get_next_hop(self, logins):
        """辅助方法:选择下一次攻击跳跃。"""
        self.log("生成攻击跳跃 #{}".format(self.attack_history.num_hops))

        # 生成下一次攻击跳跃的移动时间
        next_interarrival = random.randint(1, 3600 * self.interarrival_window_hrs)
        next_time = (
            self.attack_history.last_move_time +
            datetime.timedelta(seconds=next_interarrival)
        )

        # 根据攻击者的能力和知识生成下一跳的候选集
        candidate_next_hops = self.attack_capabilities.get_candidate_next_hops(
            self.attack_history)

        # 根据隐蔽性约束/修剪候选下一跳
        candidate_next_hops = self.attack_stealth.constrain_next_hops(
            candidate_next_hops, self.attack_history)

        # 根据领域知识和威胁模型约束/修剪候选下一跳
        candidate_next_hops = self.attack_constraints.constrain_next_hops(
            candidate_next_hops, self.attack_history)

        # 根据攻击目标选择下一跳
        next_hop = self.attack_goal.select_next_hop(
            candidate_next_hops, self.attack_history, self.attack_capabilities)

        return (next_time, next_hop.src, next_hop.dst, next_hop.user)

此方法选择下一次攻击跳跃。它生成下一次攻击跳跃的移动时间,根据攻击者的能力和知识生成候选集,并根据隐蔽性和领域知识修剪候选集,最终选择下一次攻击跳跃。

AttackPathConfig 类

class AttackPathConfig(LoggingClass):
    """封装攻击路径配置。"""
    def __init__(
        self, attack_goal, attacker_knowledge, stealth, protocol,
        start_state=None,
        src_preference=MovementConstraints.SRC_PREF_NONE,
        compromise_cred_hrs=AttackerCapabilities.DEFAULT_CRED_EXPOSED_HRS,
        active_cred_hrs=MovementStealth.ACTIVE_CRED_HRS,
        src_history_hrs=MovementStealth.DEFAULT_SRC_HISTORY_HRS,
        target_machines=set([])
    ):
        """初始化攻击路径生成器。"""
        self.attack_goal = attack_goal
        self.attacker_knowledge = attacker_knowledge
        self.attack_stealth = stealth
        self.protocol = protocol

        self.start_state = start_state
        self.src_preference = src_preference
        self.compromise_cred_hrs = compromise_cred_hrs
        self.active_cred_hrs = active_cred_hrs
        self.src_history_hrs = src_history_hrs
        self.target_machines = target_machines

    def get_file_suffix(self):
        """获取描述此攻击配置的后缀。"""
        suffix = ".{}.{}.{}.protocol={}.df"
        suffix = suffix.format(
            self.attack_goal, self.attack_stealth,
            self.attacker_knowledge, self.protocol
        )
        return suffix

    def __str__(self):
        """返回攻击配置的字符串表示。"""
        if self.start_state:
            start_str = "攻击起点:时间 = {}, 源 = {}, 用户 = {}, 工程 = {}。".format(
                self.start_state.start_time, self.start_state.start_src,
                self.start_state.start_user, self.start_state.start_strategy
            )
        else:
            start_str = "攻击起点:未指定。"

        main_str = ("攻击目标:{}。\t攻击者知识:{}。\t攻击隐蔽性:{}。"
                    "\t登录协议:{}\t源偏好:{}。\t目标机器:{}".format(
            self.attack_goal, self.attacker_knowledge, self.attack_stealth,
            self.protocol, self.src_preference, self.target_machines
        ))

        auxil_str = (
            "凭据暴露时间窗口:{} 小时。"
            "\t活动凭据时间窗口:{} 小时。"
            "\t源-目标历史时间窗口:{} 小时。"
        ).format(self.compromise_creds_hrs, self.active_cred_hrs, self.src_history_hrs)

        final_str = "{}\n{}\n{}".format(start_str, main_str, auxil_str)
        return final_str

该类封装了攻击路径的配置。初始化方法设置了攻击的目标、攻击者的知识、隐蔽性、协议、起始状态、源偏好、凭据暴露时间、活动凭据时间、源历史时间和目标机器等。

1.2 data_type

这段代码定义了一些常量类、数据列类和基础类,以支持网络攻击模拟中的数据处理和日志记录。

  • 常量类 定义了数据列名、移动类型和攻击场景中的常量。
  • LoggingClass 提供了简单的日志记录机制。
  • AttackHistory 跟踪攻击路径的历史和状态,管理已访问的目标机器和被妥协的凭据。
  • AttackNextHop 命名元组用于结构化表示攻击路径中的下一跳。

定义常量和数据列类

LoginColumns 类

class LoginColumns(object):
    """Pandas数据的列名常量。"""
    TIME = "time"
    INDEX_TIME = 'indextime'
    SRC = "src"
    DST = "dst"
    USER = "user"
    MOVEMENT_TYPE = "movement_type"
    PROTOCOL = 'protocol'  # ssh vs. Windows
    DATASET = "dataset"
    ATTACK = 'is_attack'  # groundtruth label: attack == True, else False
    ATTACK_ID = 'attack_id'  # ID for synthetic attack this corresponds to

LoginColumns 类定义了一组常量,表示登录数据中常用的列名。这些常量使得代码更具可读性和维护性。

EnrichmentColumns 类

class EnrichmentColumns(object):
    """定义中间/丰富相关列的类。"""
    SRC_SUBNET = 'src_subnet'
    SRC_LOCATION = "src_location"
    LOCATION = "location"
    MACHINE_AGE = "machine_age"
    MACHINE_EARLIEST_DATE = "machine_first_date"
    NUM_INBOUND_DAYS = "src_n_days_recv_inbound_success"
    SRC_CLIENT = "is_src_client"
    DST_CLIENT = "is_dst_client"
    SRC_OWNER = "owner"
    USER_TEAM = "user_team"
    USER_AGE = "user_age"  

EnrichmentColumns 类定义了一组常量,表示用于丰富数据的中间列名。

MovementTypes 类

class MovementTypes(object):
    MOVE_FROM_CLIENT = "movement:client-server"
    MOVE_INTO_CLIENT = "movement:into-client"
    MOVE_FROM_SERVER = "movement:server-server"

MovementTypes 类定义了几种移动类型,用于描述网络中节点之间的移动方式。

ScenarioConstants 类

class ScenarioConstants(object):
    GOAL_EXPLORATION = "goal=exploration"
    GOAL_SPREAD = "goal=aggressive-spread"
    GOAL_TARGETED = "goal=targeted"
    STEALTH_NONE = "stealth=agnostic"
    STEALTH_ENDPOINTS = "stealth=only-prev-src-dst-combos"
    STEALTH_ACTIVE_CREDS = "stealth=only-active-src-user-combos"
    STEALTH_FULL = "stealth=full-stealthiness"

ScenarioConstants 类定义了几种攻击目标和隐蔽性策略的常量。这些常量用于配置攻击模拟场景。

基础类

LoggingClass 类

class LoggingClass(object):
    """启用不同的日志输出。"""
    def __init__(self, verbose=True):
        self.verbose = verbose

    def log(self, msg):
        """辅助方法:根据是否详细模式记录消息。"""
        if self.verbose:
            print(msg)

LoggingClass 类提供了一个简单的日志记录机制,根据 verbose 参数决定是否打印日志消息。

AttackHistory 类

class AttackHistory(LoggingClass):
    """跟踪攻击的历史和状态。"""
    def __init__(self, start_state, verbose=True):
        super(AttackHistory, self).__init__(verbose=verbose)
        self.start_state = start_state

        # 跟踪攻击历史
        self.attack_path = pd.DataFrame()
        self.num_hops = 1

        # 记录每个目标机器上的被妥协凭据
        self.compromised_creds_per_dst = dict()
        self.visited_dst = set([self.start_state.start_src,])
        self.cur_machine = self.start_state.start_src
        self.cur_user = self.start_state.start_user
        self.last_move_time = self.start_state.start_time

    def get_start_accessible_dst(self):
        """获取起始用户有权限访问的机器集合。"""
        return self.start_state.start_accessible_dst

    def get_start_src(self):
        return self.start_state.start_src

    def get_start_user(self):
        return self.start_state.start_user

    def get_current_user(self):
        return self.cur_user

    def get_current_machine(self):
        return self.cur_machine

    def get_visited_dst(self):
        return self.visited_dst

    def add_new_hop(self, new_hop):
        """用新的跳跃更新攻击历史。"""
        self.attack_path = pd.concat([self.attack_path, new_hop], sort=False)
        self.cur_user = new_hop[LoginColumns.USER].iloc[0]

        new_machine = new_hop[LoginColumns.DST].iloc[0]
        self.cur_machine = new_machine
        self.visited_dst.add(new_machine)

        self.last_move_time = new_hop[LoginColumns.TIME].iloc[0]
        self.num_hops += 1

    def update_compromised_creds(self, machine, compromised_creds):
        """更新每台机器上的被妥协凭据。"""
        self.compromised_creds_per_dst[machine] = self.compromised_creds_per_dst.get(
            machine, set([])
        ) | compromised_creds

AttackHistory 类跟踪攻击的历史和状态,包括攻击路径、已访问的目标机器、当前机器和用户等。它还提供了方法来更新攻击历史和妥协凭据。

AttackNextHop 命名元组

AttackNextHop = namedtuple(
    'AttackNextHop', [LoginColumns.SRC, LoginColumns.DST, LoginColumns.USER])

AttackNextHop 是一个命名元组,用于表示攻击路径中的下一跳,包括源、目标和用户。它提供了一种结构化方式来传递攻击路径中的信息。

1.3 attack_capabilities

下面是对代码中每个函数的详细分析:

AttackerCapabilities 类

初始化方法

class AttackerCapabilities(LoggingClass):
    """在攻击过程中封装攻击者的能力。"""
    DEFAULT_CRED_EXPOSED_HRS = 24*7  # 默认凭据暴露时间为7天
    KNOWLEDGE_LOCAL = "knowledge=local"  # 攻击者只知道他们移动到的机器的历史
    KNOWLEDGE_GLOBAL = "knowledge=global"  # 攻击者知道整个网络拓扑

    def __init__(self, knowledge, compromise_cred_hrs=None, verbose=True):
        """
        参数:
            knowledge: KNOWLEDGE_ 类常量
            compromise_cred_hrs: (int) 凭据在最后一次用户登录后仍然可被利用的小时数
        """
        super(AttackerCapabilities, self).__init__(verbose=verbose)
        self.knowledge = knowledge
        self.dst_per_compromised_user = dict()  # 记录每个被妥协用户可访问的目标
        self.real_users = get_all_users()  # 获取所有真实用户

        # 设置凭据暴露时间
        self.compromise_cred_hrs = compromise_cred_hrs
        if self.compromise_cred_hrs is None:
            self.compromise_cred_hrs = self.DEFAULT_CRED_EXPOSED_HRS

        self.log("攻击者能力设置为: {}\t凭据暴露窗口={} 小时\n".format(
            self.knowledge, self.compromise_cred_hrs
        ))

get_accessible_dst_for_user

    @classmethod
    def get_accessible_dst_for_user(cls, logins, user, protocol=None):
        """获取用户可以访问的所有目标。

        通过构建用户在整个数据集中访问过的目标集合进行近似。
        """
        matching_logins = logins[
            (logins[LoginColumns.USER] == user)
        ]

        if protocol is None or not (LoginColumns.PROTOCOL in logins.columns):
            pass
        else:
            matching_logins = matching_logins[
                matching_logins[LoginColumns.PROTOCOL] == protocol
            ]

        # 目前,可访问的目标 = 用户成功访问过的目标
        # 这是用户实际权限/可访问目标的子集
        prior_dst = set(matching_logins[LoginColumns.DST].drop_duplicates())
        prior_dst = prior_dst - UNINTERESTING_DST

        return prior_dst

这个方法获取用户在过去的登录中访问过的所有目标节点,并返回这些目标节点的集合。

initialize_capabilities

    def initialize_capabilities(self, start_state, logins):
        """初始化攻击者的能力。"""
        self._update_candidate_dst(logins, start_state.start_user)

这个方法初始化攻击者的能力,具体通过更新初始用户可访问的目标节点集合。

update_capabilities

    def update_capabilities(self, new_time, new_dst, logins):
        """更新能力集:被妥协的凭据集和目标集。

        参数:
            new_time: datetime.datetime 新的时间点
            new_dst: str 新的目标节点
            logins: pd.DataFrame 所有考虑的登录事件
        """
        compromised_users = self._get_new_compromised_users(
            logins, new_time, new_dst)
        for (user, protocol) in compromised_users:
            self._update_candidate_dst(logins, user, protocol)

        compromised_users = set([u for (u, protocol) in compromised_users])
        return compromised_users

这个方法在攻击者成功移动到新目标节点后更新其能力集,包括被妥协的凭据和可访问的目标节点。

get_candidate_next_hops

    def get_candidate_next_hops(self, attack_history):
        """生成所有可能的移动跳跃。

        参数:
            attack_history: data_types.AttackHistory 对象
        返回:
            [AttackNextHop namedtuple 的列表]
        """
        candidate_hops = []
        candidate_srcs = self._get_candidate_src(attack_history)
        self.log("候选跳跃: 候选源 = {}".format(candidate_srcs))

        next_hops = flatten_list([
            [AttackNextHop(src, dst, user) for src in candidate_srcs]
            for (user, dst) in self._get_candidate_user_dst()
        ])

        self.log("候选跳跃: 生成了 {} 个可能的跳跃".format(len(next_hops)))
        return next_hops

这个方法生成所有可能的移动跳跃,即攻击者可能的下一步行动。它首先获取候选源节点,然后生成每个用户-目标节点对的所有可能跳跃。

_get_candidate_src

    def _get_candidate_src(self, attack_history):
        """辅助方法:获取下一跳的候选源 = 所有访问过的机器。"""
        return attack_history.visited_dst

这个方法返回攻击历史中所有访问过的目标节点,作为下一跳的候选源节点。

_get_candidate_user_dst

    def _get_candidate_user_dst(self):
        """辅助方法:获取可访问的 (user, dst) 移动对。"""
        user_and_dst_pairs = flatten_list([
            [(user, dst) for dst in self.dst_per_compromised_user[user]]
            for user in self.dst_per_compromised_user
        ])
        self.log("候选跳跃: {} 个候选 (user, dst) 对".format(len(user_and_dst_pairs)))

        return user_and_dst_pairs

这个方法获取所有可访问的用户-目标节点对,用于生成候选跳跃。

_get_new_compromised_users

    def _get_new_compromised_users(self, logins, time, host):
        """辅助方法:模拟攻击者在主机上妥协凭据。

        返回:
            set([tuple(user, protocol)])
        """
        if host in NON_COMPROMISE_HOSTS or not is_compromisable_host(host):
            self.log(
                "更新被妥协的凭据:\t主机 = {} 是不可妥协的主机,跳过凭据妥协".format(host))
            return set([])

        # 确定最近登录到新妥协主机的所有登录
        lower_bound = time - datetime.timedelta(hours=self.compromise_cred_hrs)
        vuln_logins = logins[
            (logins[LoginColumns.DST] == host) &
            (logins[LoginColumns.TIME] <= time) &
            (logins[LoginColumns.TIME] >= lower_bound)
        ]

        # 确定最近登录到主机的所有用户(可能缓存的凭据)
        vuln_users = [
            (r[LoginColumns.USER], r[LoginColumns.PROTOCOL]) for idx, r in
            vuln_logins[
                [LoginColumns.USER, LoginColumns.PROTOCOL]
            ].drop_duplicates().iterrows()
        ]

        # '妥协'所有真实用户名
        compromised_users = set([
            (u, protocol) for (u, protocol) in vuln_users if u in self.real_users])
        self.log("更新被妥协的凭据:\t"
                 "主机 = {} 在过去 {} 小时内有 {} 个用户登录。\n"
                 "妥协的(真实)用户集 = {}".format(
                     host, len(vuln_users),
                     self.compromise_cred_hrs, compromised_users
        ))

        return compromised_users

这个方法模拟攻击者在目标主机上妥协凭据。它识别最近登录到目标主机的用户,并返回这些用户和协议的集合。

_update_candidate_dst

    def _update_candidate_dst(self, logins, user, protocol=None):
        """辅助方法:更新攻击者可以移动到的目标集。"""
        if user in self.dst_per_compromised_user:
            # 节省计算:我们可以忽略更新目标集
            return

        # 确定用户(凭据)以前访问过的目标 = 候选目标
        prior_dst = self.get_accessible_dst_for_user(logins, user, protocol=protocol)

        # 记录用户的凭据提供访问的目标
        self.dst_per_compromised_user[user] = \
            prior_dst | self.dst_per_compromised_user.get(user, set([]))

        self.log("使用 {} 个目标更新候选目标 "
                 "(示例: {}) 为被妥协用户 = {}。\n".format(
            len(prior_dst), safe_rand_sample(prior_dst, 5), user
        ))

这个方法更新攻击者可以移动到的目标集。它识别用户以前访问过的目标节点,并将这些节点加入候选目标集中。

总结

AttackerCapabilities 类封装了攻击者在攻击过程中拥有的资源和能力。它包括初始化攻击者的能力、更新妥协凭据和目标集、生成候选跳跃等功能。这些方法使得攻击模拟更加真实和复杂,能够更好地评估网络防御机制的有效性。

1.4 attack_lab架构解析

Python 文件架构分析及函数之间的依赖关系

这个 Python 文件主要用于模拟和合成网络攻击路径,包括生成初始妥协点、生成攻击路径和评估攻击是否成功。

  1. synthesize_attack 函数:主函数,用于合成攻击路径。
  2. is_synthetic_attack_successful 函数:判断合成攻击是否成功。
  3. AttackPathGenerator 类:生成攻击路径的核心类。
  4. AttackPathConfig 类:封装攻击路径配置。

函数和类之间的依赖关系

synthesize_attack 函数

synthesize_attack 函数是整个攻击合成过程的入口点。它依赖于以下内容:

  • AttackPathConfig 类:传入的 attack_config 对象,用于配置攻击路径的各种参数。
  • AttackPathGenerator 类:用于生成攻击路径的核心类,实例化后调用其 make_attack 方法生成攻击路径。

is_synthetic_attack_successful 函数

def is_synthetic_attack_successful(attack_df):
    """如果攻击未能切换到新的用户凭据,则攻击不成功。"""
    if attack_df is None或len(attack_df) == 0:
        return False
    users = attack_df[LoginColumns.USER].drop_duplicates()
    return len(users) >= 2

is_synthetic_attack_successful 函数用于判断合成的攻击是否成功。它依赖于:

  • attack_df:由 synthesize_attack 函数生成的攻击路径数据帧。

AttackPathGenerator 类

AttackPathGenerator 类是生成攻击路径的核心类。其主要方法和内部依赖如下:

  1. 初始化方法
class AttackPathGenerator(LoggingClass):
    """生成不同类型的横向移动路径。"""
    VERSION = 0

    def __init__(
        self, attack_goal, attacker_knowledge, stealth,
        start_state=None, compromise_cred_hrs=None,
        active_cred_hrs=None, src_history_hrs=None, src_preference=None,
        target_machines=set([]), verbose=True, start_dt=None
    ):
        super(AttackPathGenerator, self).__init__(verbose=verbose)
        self.real_users = get_all_users()
        self.interarrival_window_hrs = 2
        self.start_dt = start_dt
        self.attack_start = start_state
        self.attack_history = None
        self.attack_capabilities = AttackerCapabilities(
            attacker_knowledge, compromise_cred_hrs)
        self.attack_constraints = MovementConstraints(src_preference)
        self.attack_stealth = MovementStealth(
            stealth, active_cred_hrs=active_cred_hrs,
            src_history_hrs=src_history_hrs)
        self.attack_goal = MovementGoal(
            attack_goal, target_machines=target_machines)
  • AttackerCapabilities:攻击者的能力,决定了可以执行哪些攻击动作。
  • MovementConstraints:攻击者的约束,决定了攻击路径的生成规则。
  • MovementStealth:攻击的隐蔽性配置。
  • MovementGoal:攻击的目标配置。
  1. make_attack 方法
    def make_attack(self, logins):
        """生成攻击登录事件的主方法。"""
        logins = self._preprocess_logins(logins)
        self._initialize_start(logins)
        while not self.attack_goal.is_attack_complete(self.attack_history):
            next_time, next_src, next_dst, next_user = self._get_next_hop(logins)
            if next_dst is None:
                self.log("\n警告:攻击已失去潜在目标。终止!!!\n")
                break
            else:
                self.log("选择下一个跳跃:"
                         "(下一跳时间 = {}, 源 = {}, 目标 = {}, 用户 = {}。".format(
                    next_time, next_src, next_dst, next_user
                ))
            new_hop = LoginSynthesizer().synthesize_login(
                logins, next_time, next_src, next_dst, next_user)
            new_hop.loc[:, LoginColumns.ATTACK] = True
            self._make_next_hop(new_hop, logins)
        return self.attack_history.attack_path.reset_index(drop=True)
  • LoginSynthesizer:用于合成新的登录事件。
  • _preprocess_logins_initialize_start_get_next_hop_make_next_hop 等内部方法:用于处理登录事件、初始化攻击起点、生成下一次攻击跳跃并更新攻击历史。

AttackPathConfig 类

class AttackPathConfig(LoggingClass):
    """封装攻击路径配置。"""
    def __init__(
        self, attack_goal, attacker_knowledge, stealth, protocol,
        start_state=None,
        src_preference=MovementConstraints.SRC_PREF_NONE,
        compromise_cred_hrs=AttackerCapabilities.DEFAULT_CRED_EXPOSED_HRS,
        active_cred_hrs=MovementStealth.ACTIVE_CRED_HRS,
        src_history_hrs=MovementStealth.DEFAULT_SRC_HISTORY_HRS,
        target_machines=set([])
    ):
        self.attack_goal = attack_goal
        self.attacker_knowledge = attacker_knowledge
        self.attack_stealth = stealth
        self.protocol = protocol
        self.start_state = start_state
        self.src_preference = src_preference
        self.compromise_cred_hrs = compromise_cred_hrs
        self.active_cred_hrs = active_cred_hrs
        self.src_history_hrs = src_history_hrs
        self.target_machines = target_machines
  • 该类用于封装攻击路径配置,包含攻击目标、攻击者知识、隐蔽性、协议、起始状态等。

函数和类的相互依赖关系总结

  • synthesize_attack 依赖 AttackPathConfig 来配置攻击路径,并使用 AttackPathGenerator 来生成攻击路径。
  • AttackPathGenerator 依赖 AttackerCapabilitiesMovementConstraintsMovementStealthMovementGoal 等类来生成攻击路径。
  • is_synthetic_attack_successful 依赖 synthesize_attack 生成的攻击路径数据帧来判断攻击是否成功。

实现威胁增强

威胁增强通过以下步骤实现:

  1. 配置攻击路径:使用 AttackPathConfig 配置攻击路径的目标、攻击者的知识、隐蔽性等参数。
  2. 生成攻击路径:使用 AttackPathGenerator 生成攻击路径,具体包括初始化起点、生成每个跳跃、更新攻击历史等。
  3. 合成攻击事件:在每个攻击跳跃中,使用 LoginSynthesizer 合成新的登录事件,模拟攻击者的行为。
  4. 评估攻击成功与否:使用 is_synthetic_attack_successful 判断生成的攻击路径是否成功。

这种威胁增强方法通过模拟真实攻击路径,生成具有代表性的攻击数据,帮助网络安全研究人员和从业人员评估和改进安全防护机制。

1.5 特点

  1. 规则和策略:根据预定义的攻击目标、攻击者知识、隐蔽性等策略,模拟真实攻击中的横向移动路径。这些策略和规则帮助生成看似真实的攻击路径。

  2. 数据处理和过滤:通过对登录数据的预处理和过滤,确保选择的初始妥协点和后续攻击路径符合实际场景中的攻击模式。例如,过滤掉系统管理员、确保用户和机器有足够的历史数据等。

  3. 随机性:在选择初始妥协点、下一跳攻击目标等过程中引入随机性,以模拟攻击者的不确定行为。这种随机性帮助生成多样化的攻击路径,提高模拟攻击的真实性。

代码实现威胁增强的具体方法

1. 规则和策略

代码通过 AttackPathConfig 类来配置攻击路径的目标、攻击者的知识、隐蔽性等参数,这些参数会影响攻击路径的生成规则和策略。

class AttackPathConfig(LoggingClass):
    """封装攻击路径配置。"""
    def __init__(
        self, attack_goal, attacker_knowledge, stealth, protocol,
        start_state=None,
        src_preference=MovementConstraints.SRC_PREF_NONE,
        compromise_cred_hrs=AttackerCapabilities.DEFAULT_CRED_EXPOSED_HRS,
        active_cred_hrs=MovementStealth.ACTIVE_CRED_HRS,
        src_history_hrs=MovementStealth.DEFAULT_SRC_HISTORY_HRS,
        target_machines=set([])
    ):
        """初始化攻击路径生成器。"""
        self.attack_goal = attack_goal
        self.attacker_knowledge = attacker_knowledge
        self.attack_stealth = stealth
        self.protocol = protocol

        self.start_state = start_state
        self.src_preference = src_preference
        self.compromise_cred_hrs = compromise_cred_hrs
        self.active_cred_hrs = active_cred_hrs
        self.src_history_hrs = src_history_hrs
        self.target_machines = target_machines

2. 数据处理和过滤

代码通过 AttackPathGenerator 类中的 _preprocess_logins 方法来预处理登录数据,确保选择的初始妥协点和攻击路径符合实际场景中的攻击模式。

def _preprocess_logins(self, logins):
    """辅助方法:预处理登录事件以生成攻击。"""
    logins = logins[
        (logins[LoginColumns.TIME] >= self.start_dt)
    ]

    # 过滤只包含真实用户的登录事件
    logins = logins[logins[LoginColumns.USER].isin(self.real_users)]

    # 过滤掉来自忽略集机器的登录事件

    # 添加你想应用的其他过滤

    return logins

3. 随机性

在选择初始妥协点、下一跳攻击目标等过程中引入随机性,以模拟攻击者的不确定行为。

def _select_start_user_random(self, logins, candidate_start_users=None):
    """选择随机起始用户。"""
    # 起始妥协机器应该是客户端
    logins = logins[logins[EnrichmentColumns.SRC_CLIENT] == True]
    if self.start_time:
        logins = logins[
            (logins[LoginColumns.TIME] >= self.start_time - datetime.timedelta(hours=24)) &
            (logins[LoginColumns.TIME] <= self.start_time + datetime.timedelta(hours=24))
        ]

    # 随机选择一个起始受害者/妥协用户
    if not candidate_start_users:
        candidate_start_users = self._get_viable_start_users(logins)
    self.start_user = random.sample(candidate_start_users, 1)[0]
    print("起始用户:随机选择 {} 从 {} 个可行的起始用户中".format(
        self.start_user, len(candidate_start_users)
    ))

在生成下一次攻击跳跃时,通过随机选择时间和目标来模拟攻击者的不确定性。

def _get_next_hop(self, logins):
    """辅助方法:选择下一次攻击跳跃。"""
    self.log("生成攻击跳跃 #{}".format(self.attack_history.num_hops))

    # 生成下一次攻击跳跃的移动时间
    next_interarrival = random.randint(1, 3600 * self.interarrival_window_hrs)
    next_time = (
        self.attack_history.last_move_time +
        datetime.timedelta(seconds=next_interarrival)
    )

    # 根据攻击者的能力和知识生成下一跳的候选集
    candidate_next_hops = self.attack_capabilities.get_candidate_next_hops(
        self.attack_history)

    # 根据隐蔽性约束/修剪候选下一跳
    candidate_next_hops = self.attack_stealth.constrain_next_hops(
        candidate_next_hops, self.attack_history)

    # 根据领域知识和威胁模型约束/修剪候选下一跳
    candidate_next_hops = self.attack_constraints.constrain_next_hops(
        candidate_next_hops, self.attack_history)

    # 根据攻击目标选择下一跳
    next_hop = self.attack_goal.select_next_hop(
        candidate_next_hops, self.attack_history, self.attack_capabilities)

    return (next_time, next_hop.src, next_hop.dst, next_hop.user)

总结起来,刚才的代码通过结合预定义的规则和策略、对登录数据的预处理和过滤以及引入随机性来实现威胁增强。它模拟了真实攻击中的各种行为和路径,生成看似真实的攻击事件日志,帮助研究人员和从业者更好地理解和防御网络攻击。