威胁增强
1.1 attack_lab
synthesize_attack
def synthesize_attack(logins, attack_config, start_dt=None):
"""从头合成攻击的方法。
参数:
logins: pd.DataFrame : 每行一个登录事件
attack_config: attack_lib.AttackPathConfig 对象
start_dt: datetime.datetime 最早使用的登录时间
返回:
pd.DataFrame 合成的攻击登录事件
"""
# 确保登录事件有适当的列/字段
columns = logins.columns
assert(
LoginColumns.TIME in columns and
LoginColumns.SRC in columns and
LoginColumns.USER in columns and
LoginColumns.DST in columns and
LoginColumns.DATASET in columns
)
# 根据登录类型限制数据集
if attack_config.protocol == 'ssh':
data = logins[
logins[LoginColumns.DATASET].str.lower().str.contains('ssh')]
elif attack_config.protocol == 'windows':
data = logins[
logins[LoginColumns.DATASET].str.lower().str.contains('windows')]
else:
data = logins
print("使用 {} {} 登录事件进行攻击合成\n".format(
len(data), attack_config.protocol))
# 生成攻击路径
attack_generator = AttackPathGenerator(
attack_config.attack_goal,
start_state=attack_config.start_state,
attacker_knowledge=attack_config.attacker_knowledge,
stealth=attack_config.attack_stealth,
src_preference=attack_config.src_preference,
src_history_hrs=attack_config.src_history_hrs,
compromise_cred_hrs=attack_config.compromise_cred_hrs,
active_cred_hrs=attack_config.active_cred_hrs,
target_machines=attack_config.target_machines,
start_dt=start_dt
)
attack = attack_generator.make_attack(data)
return attack
此函数用于从头开始合成攻击路径。
它接收一个包含登录事件的DataFrame和一个攻击配置对象,返回一个包含合成攻击路径的DataFrame。
is_synthetic_attack_successful
def is_synthetic_attack_successful(attack_df):
"""如果攻击未能切换到新的用户凭据,则攻击不成功。"""
if attack_df is None or len(attack_df) == 0:
return False
users = attack_df[LoginColumns.USER].drop_duplicates()
return len(users) >= 2
此函数用于判断合成攻击是否成功。如果攻击路径中未能切换到新的用户凭据,则认为攻击不成功。
AttackPathGenerator 类
初始化方法
class AttackPathGenerator(LoggingClass):
"""生成不同类型的横向移动路径。"""
VERSION = 0
def __init__(
self, attack_goal, attacker_knowledge, stealth,
start_state=None, compromise_cred_hrs=None,
active_cred_hrs=None, src_history_hrs=None, src_preference=None,
target_machines=set([]), verbose=True, start_dt=None
):
"""初始化攻击路径生成器。
参数:
attack_goal: MovementGoal 中的目标常量
attacker_knowledge: AttackerCapabilities 中的知识常量
stealth: MovementStealth 中的隐蔽性常量
state_state: utils.AttackStart 对象
compromise_cred_hrs: (int) 凭据在最后一次用户登录后仍然可被利用的小时数
active_cred_hrs: (int) 检测器用于因果链接两次登录事件的小时数,适用于隐蔽攻击
src_history_hrs: (int) 攻击者可以挖掘的机器最近登录历史的小时数
src_preference: MovementConstraints 中的 SRC_PREF 常量
target_machines: 目标攻击的高价值主机名集合(字符串)
"""
super(AttackPathGenerator, self).__init__(verbose=verbose)
self.real_users = get_all_users()
# 随机采样间隔时间(秒)
# 在 [0, self.interarrival_window_hrs] 之间,为下一次攻击跳跃设置时间间隔
self.interarrival_window_hrs = 2
self.start_dt = start_dt
self.attack_start = start_state
self.attack_history = None
self.attack_capabilities = AttackerCapabilities(
attacker_knowledge, compromise_cred_hrs)
self.attack_constraints = MovementConstraints(src_preference)
self.attack_stealth = MovementStealth(
stealth, active_cred_hrs=active_cred_hrs,
src_history_hrs=src_history_hrs)
self.attack_goal = MovementGoal(
attack_goal, target_machines=target_machines)
该类用于生成不同类型的横向移动攻击路径。
初始化方法设置了攻击的目标、攻击者的知识、隐蔽性、起始状态、凭据暴露时间、活动凭据时间、源历史时间、源偏好以及目标机器等。
make_attack 方法
def make_attack(self, logins):
"""生成攻击登录事件的主方法。"""
logins = self._preprocess_logins(logins)
# 初始化起始跳跃
self._initialize_start(logins)
# 迭代生成下一个跳跃,直到达到攻击目标,或攻击失去选择
while not self.attack_goal.is_attack_complete(self.attack_history):
next_time, next_src, next_dst, next_user = self._get_next_hop(logins)
if next_dst is None:
# 如果没有更多的机器可以移动到,则终止
self.log("\n警告:攻击已失去潜在目标。终止!!!\n")
break
else:
# 打印进度信息
self.log("选择下一个跳跃:"
"(下一跳时间 = {}, 源 = {}, 目标 = {}, 用户 = {}。".format(
next_time, next_src, next_dst, next_user
))
# 合成一个新的登录事件,给定
# (1) 起始机器,(2) 用户,(3) 目标机器
new_hop = LoginSynthesizer().synthesize_login(
logins, next_time, next_src, next_dst, next_user)
new_hop.loc[:, LoginColumns.ATTACK] = True
# 进行横向移动到新的目标并更新状态
self._make_next_hop(new_hop, logins)
return self.attack_history.attack_path.reset_index(drop=True)
此方法是生成攻击登录事件的主方法。它先预处理登录事件,然后初始化起始跳跃,接着迭代生成下一个跳跃,直到达到攻击目标或攻击失去选择。
_preprocess_logins 方法
def _preprocess_logins(self, logins):
"""辅助方法:预处理登录事件以生成攻击。"""
logins = logins[
(logins[LoginColumns.TIME] >= self.start_dt)
]
# 过滤只包含真实用户的登录事件
logins = logins[logins[LoginColumns.USER].isin(self.real_users)]
# 过滤掉来自忽略集机器的登录事件
# 添加你想应用的其他过滤
return logins
此方法预处理登录事件以生成攻击。它过滤出开始时间之后的登录事件,并只保留包含真实用户的登录事件。
_engineer_attack_start 方法
def _engineer_attack_start(self, logins):
"""根据隐蔽性工程攻击起点以确保成功。"""
print("工程攻击起点。\n")
i = 0
while self.attack_start is None and i < 100:
i += 1
try:
# 根据指定的攻击类型工程攻击起始状态
if self.attack_stealth.stealth == MovementStealth.STEALTH_ACTIVE_CREDS:
self.attack_start = AttackStart(AttackStart.START_AMBIG_PATH)
elif self.attack_stealth.stealth == MovementStealth.STEALTH_ENDPOINTS:
self.attack_start = AttackStart(AttackStart.START_LONG_PATH)
elif self.attack_stealth.stealth == MovementStealth.STEALTH_FULL:
self.attack_start = AttackStart(AttackStart.START_STEALTH_PATH)
else:
self.attack_start = AttackStart()
self.attack_start.initialize(logins)
except:
self.attack_start = None
print("攻击起点生成失败:{}".format(i))
此方法根据攻击的隐蔽性来工程攻击起点,以确保攻击成功。它尝试不同的起始状态,直到找到合适的起点或尝试次数达到上限。
_initialize_start 方法
def _initialize_start(self, logins):
"""辅助方法:选择初始妥协起点和时间。"""
# 初始化攻击起点
if self.attack_start is None:
i = 0
print("随机化攻击起点。\n")
self.attack_start = AttackStart()
self.attack_start.initialize(logins)
else:
print("预先指定的起始状态:{}\t{}\t{}".format(
self.attack_start.start_time, self.attack_start.start_src,
self.attack_start.start_user
))
# 初始化起始状态以填补任何未指定的值
self.attack_start.initialize(logins)
# 初始化目标
self.attack_goal.target_info.initialize(
self.attack_start.start_src, self.attack_start.start_time, logins)
# 填充攻击历史
self.attack_history = AttackHistory(self.attack_start)
# 初始化攻击者的能力
compromised_users = set([self.attack_start.start_user,])
foothold = self.attack_start.start_src
self.attack_capabilities.initialize_capabilities(self.attack_start, logins)
self.attack_stealth.update_knowledge(
self.attack_start.start_time, foothold, logins)
self.attack_history.update_compromised_creds(foothold, compromised_users)
self.attack_goal.update_progress(foothold, compromised_users)
此方法选择初始妥协起点和时间。它初始化攻击起点、目标、攻击历史和攻击者的能力。
_make_next_hop 方法
def _make_next_hop(self, new_hop, logins):
"""辅助方法:基于攻击移动到下一跳更新状态。"""
self.log("移动到新的攻击边:")
self.log(list(new_hop[LOGIN_ANALYSIS_COLUMNS].itertuples(index=False))[0])
new_time = new_hop[LoginColumns.TIME].iloc[0]
new_dst = new_hop[LoginColumns.DST].iloc[0]
compromised_users = self.attack_capabilities.update_capabilities(
new_time, new_dst, logins)
self.attack_stealth.update_knowledge(new_time, new_dst, logins)
self.attack_history.add_new_hop(new_hop)
self.attack_history.update_compromised_creds(new_dst, compromised_users)
self.attack_goal.update_progress(new_dst, compromised_users)
此方法基于攻击移动到下一跳更新状态。它更新攻击者的能力、知识、攻击历史和攻击目标的进展。
_get_next_hop 方法
def _get_next_hop(self, logins):
"""辅助方法:选择下一次攻击跳跃。"""
self.log("生成攻击跳跃 #{}".format(self.attack_history.num_hops))
# 生成下一次攻击跳跃的移动时间
next_interarrival = random.randint(1, 3600 * self.interarrival_window_hrs)
next_time = (
self.attack_history.last_move_time +
datetime.timedelta(seconds=next_interarrival)
)
# 根据攻击者的能力和知识生成下一跳的候选集
candidate_next_hops = self.attack_capabilities.get_candidate_next_hops(
self.attack_history)
# 根据隐蔽性约束/修剪候选下一跳
candidate_next_hops = self.attack_stealth.constrain_next_hops(
candidate_next_hops, self.attack_history)
# 根据领域知识和威胁模型约束/修剪候选下一跳
candidate_next_hops = self.attack_constraints.constrain_next_hops(
candidate_next_hops, self.attack_history)
# 根据攻击目标选择下一跳
next_hop = self.attack_goal.select_next_hop(
candidate_next_hops, self.attack_history, self.attack_capabilities)
return (next_time, next_hop.src, next_hop.dst, next_hop.user)
此方法选择下一次攻击跳跃。它生成下一次攻击跳跃的移动时间,根据攻击者的能力和知识生成候选集,并根据隐蔽性和领域知识修剪候选集,最终选择下一次攻击跳跃。
AttackPathConfig 类
class AttackPathConfig(LoggingClass):
"""封装攻击路径配置。"""
def __init__(
self, attack_goal, attacker_knowledge, stealth, protocol,
start_state=None,
src_preference=MovementConstraints.SRC_PREF_NONE,
compromise_cred_hrs=AttackerCapabilities.DEFAULT_CRED_EXPOSED_HRS,
active_cred_hrs=MovementStealth.ACTIVE_CRED_HRS,
src_history_hrs=MovementStealth.DEFAULT_SRC_HISTORY_HRS,
target_machines=set([])
):
"""初始化攻击路径生成器。"""
self.attack_goal = attack_goal
self.attacker_knowledge = attacker_knowledge
self.attack_stealth = stealth
self.protocol = protocol
self.start_state = start_state
self.src_preference = src_preference
self.compromise_cred_hrs = compromise_cred_hrs
self.active_cred_hrs = active_cred_hrs
self.src_history_hrs = src_history_hrs
self.target_machines = target_machines
def get_file_suffix(self):
"""获取描述此攻击配置的后缀。"""
suffix = ".{}.{}.{}.protocol={}.df"
suffix = suffix.format(
self.attack_goal, self.attack_stealth,
self.attacker_knowledge, self.protocol
)
return suffix
def __str__(self):
"""返回攻击配置的字符串表示。"""
if self.start_state:
start_str = "攻击起点:时间 = {}, 源 = {}, 用户 = {}, 工程 = {}。".format(
self.start_state.start_time, self.start_state.start_src,
self.start_state.start_user, self.start_state.start_strategy
)
else:
start_str = "攻击起点:未指定。"
main_str = ("攻击目标:{}。\t攻击者知识:{}。\t攻击隐蔽性:{}。"
"\t登录协议:{}\t源偏好:{}。\t目标机器:{}".format(
self.attack_goal, self.attacker_knowledge, self.attack_stealth,
self.protocol, self.src_preference, self.target_machines
))
auxil_str = (
"凭据暴露时间窗口:{} 小时。"
"\t活动凭据时间窗口:{} 小时。"
"\t源-目标历史时间窗口:{} 小时。"
).format(self.compromise_creds_hrs, self.active_cred_hrs, self.src_history_hrs)
final_str = "{}\n{}\n{}".format(start_str, main_str, auxil_str)
return final_str
该类封装了攻击路径的配置。初始化方法设置了攻击的目标、攻击者的知识、隐蔽性、协议、起始状态、源偏好、凭据暴露时间、活动凭据时间、源历史时间和目标机器等。
1.2 data_type
这段代码定义了一些常量类、数据列类和基础类,以支持网络攻击模拟中的数据处理和日志记录。
- 常量类 定义了数据列名、移动类型和攻击场景中的常量。
- LoggingClass 提供了简单的日志记录机制。
- AttackHistory 跟踪攻击路径的历史和状态,管理已访问的目标机器和被妥协的凭据。
- AttackNextHop 命名元组用于结构化表示攻击路径中的下一跳。
定义常量和数据列类
LoginColumns 类
class LoginColumns(object):
"""Pandas数据的列名常量。"""
TIME = "time"
INDEX_TIME = 'indextime'
SRC = "src"
DST = "dst"
USER = "user"
MOVEMENT_TYPE = "movement_type"
PROTOCOL = 'protocol' # ssh vs. Windows
DATASET = "dataset"
ATTACK = 'is_attack' # groundtruth label: attack == True, else False
ATTACK_ID = 'attack_id' # ID for synthetic attack this corresponds to
LoginColumns
类定义了一组常量,表示登录数据中常用的列名。这些常量使得代码更具可读性和维护性。
EnrichmentColumns 类
class EnrichmentColumns(object):
"""定义中间/丰富相关列的类。"""
SRC_SUBNET = 'src_subnet'
SRC_LOCATION = "src_location"
LOCATION = "location"
MACHINE_AGE = "machine_age"
MACHINE_EARLIEST_DATE = "machine_first_date"
NUM_INBOUND_DAYS = "src_n_days_recv_inbound_success"
SRC_CLIENT = "is_src_client"
DST_CLIENT = "is_dst_client"
SRC_OWNER = "owner"
USER_TEAM = "user_team"
USER_AGE = "user_age"
EnrichmentColumns
类定义了一组常量,表示用于丰富数据的中间列名。
MovementTypes 类
class MovementTypes(object):
MOVE_FROM_CLIENT = "movement:client-server"
MOVE_INTO_CLIENT = "movement:into-client"
MOVE_FROM_SERVER = "movement:server-server"
MovementTypes
类定义了几种移动类型,用于描述网络中节点之间的移动方式。
ScenarioConstants 类
class ScenarioConstants(object):
GOAL_EXPLORATION = "goal=exploration"
GOAL_SPREAD = "goal=aggressive-spread"
GOAL_TARGETED = "goal=targeted"
STEALTH_NONE = "stealth=agnostic"
STEALTH_ENDPOINTS = "stealth=only-prev-src-dst-combos"
STEALTH_ACTIVE_CREDS = "stealth=only-active-src-user-combos"
STEALTH_FULL = "stealth=full-stealthiness"
ScenarioConstants
类定义了几种攻击目标和隐蔽性策略的常量。这些常量用于配置攻击模拟场景。
基础类
LoggingClass 类
class LoggingClass(object):
"""启用不同的日志输出。"""
def __init__(self, verbose=True):
self.verbose = verbose
def log(self, msg):
"""辅助方法:根据是否详细模式记录消息。"""
if self.verbose:
print(msg)
LoggingClass
类提供了一个简单的日志记录机制,根据 verbose
参数决定是否打印日志消息。
AttackHistory 类
class AttackHistory(LoggingClass):
"""跟踪攻击的历史和状态。"""
def __init__(self, start_state, verbose=True):
super(AttackHistory, self).__init__(verbose=verbose)
self.start_state = start_state
# 跟踪攻击历史
self.attack_path = pd.DataFrame()
self.num_hops = 1
# 记录每个目标机器上的被妥协凭据
self.compromised_creds_per_dst = dict()
self.visited_dst = set([self.start_state.start_src,])
self.cur_machine = self.start_state.start_src
self.cur_user = self.start_state.start_user
self.last_move_time = self.start_state.start_time
def get_start_accessible_dst(self):
"""获取起始用户有权限访问的机器集合。"""
return self.start_state.start_accessible_dst
def get_start_src(self):
return self.start_state.start_src
def get_start_user(self):
return self.start_state.start_user
def get_current_user(self):
return self.cur_user
def get_current_machine(self):
return self.cur_machine
def get_visited_dst(self):
return self.visited_dst
def add_new_hop(self, new_hop):
"""用新的跳跃更新攻击历史。"""
self.attack_path = pd.concat([self.attack_path, new_hop], sort=False)
self.cur_user = new_hop[LoginColumns.USER].iloc[0]
new_machine = new_hop[LoginColumns.DST].iloc[0]
self.cur_machine = new_machine
self.visited_dst.add(new_machine)
self.last_move_time = new_hop[LoginColumns.TIME].iloc[0]
self.num_hops += 1
def update_compromised_creds(self, machine, compromised_creds):
"""更新每台机器上的被妥协凭据。"""
self.compromised_creds_per_dst[machine] = self.compromised_creds_per_dst.get(
machine, set([])
) | compromised_creds
AttackHistory
类跟踪攻击的历史和状态,包括攻击路径、已访问的目标机器、当前机器和用户等。它还提供了方法来更新攻击历史和妥协凭据。
AttackNextHop 命名元组
AttackNextHop = namedtuple(
'AttackNextHop', [LoginColumns.SRC, LoginColumns.DST, LoginColumns.USER])
AttackNextHop
是一个命名元组,用于表示攻击路径中的下一跳,包括源、目标和用户。它提供了一种结构化方式来传递攻击路径中的信息。
1.3 attack_capabilities
下面是对代码中每个函数的详细分析:
AttackerCapabilities 类
初始化方法
class AttackerCapabilities(LoggingClass):
"""在攻击过程中封装攻击者的能力。"""
DEFAULT_CRED_EXPOSED_HRS = 24*7 # 默认凭据暴露时间为7天
KNOWLEDGE_LOCAL = "knowledge=local" # 攻击者只知道他们移动到的机器的历史
KNOWLEDGE_GLOBAL = "knowledge=global" # 攻击者知道整个网络拓扑
def __init__(self, knowledge, compromise_cred_hrs=None, verbose=True):
"""
参数:
knowledge: KNOWLEDGE_ 类常量
compromise_cred_hrs: (int) 凭据在最后一次用户登录后仍然可被利用的小时数
"""
super(AttackerCapabilities, self).__init__(verbose=verbose)
self.knowledge = knowledge
self.dst_per_compromised_user = dict() # 记录每个被妥协用户可访问的目标
self.real_users = get_all_users() # 获取所有真实用户
# 设置凭据暴露时间
self.compromise_cred_hrs = compromise_cred_hrs
if self.compromise_cred_hrs is None:
self.compromise_cred_hrs = self.DEFAULT_CRED_EXPOSED_HRS
self.log("攻击者能力设置为: {}\t凭据暴露窗口={} 小时\n".format(
self.knowledge, self.compromise_cred_hrs
))
get_accessible_dst_for_user
@classmethod
def get_accessible_dst_for_user(cls, logins, user, protocol=None):
"""获取用户可以访问的所有目标。
通过构建用户在整个数据集中访问过的目标集合进行近似。
"""
matching_logins = logins[
(logins[LoginColumns.USER] == user)
]
if protocol is None or not (LoginColumns.PROTOCOL in logins.columns):
pass
else:
matching_logins = matching_logins[
matching_logins[LoginColumns.PROTOCOL] == protocol
]
# 目前,可访问的目标 = 用户成功访问过的目标
# 这是用户实际权限/可访问目标的子集
prior_dst = set(matching_logins[LoginColumns.DST].drop_duplicates())
prior_dst = prior_dst - UNINTERESTING_DST
return prior_dst
这个方法获取用户在过去的登录中访问过的所有目标节点,并返回这些目标节点的集合。
initialize_capabilities
def initialize_capabilities(self, start_state, logins):
"""初始化攻击者的能力。"""
self._update_candidate_dst(logins, start_state.start_user)
这个方法初始化攻击者的能力,具体通过更新初始用户可访问的目标节点集合。
update_capabilities
def update_capabilities(self, new_time, new_dst, logins):
"""更新能力集:被妥协的凭据集和目标集。
参数:
new_time: datetime.datetime 新的时间点
new_dst: str 新的目标节点
logins: pd.DataFrame 所有考虑的登录事件
"""
compromised_users = self._get_new_compromised_users(
logins, new_time, new_dst)
for (user, protocol) in compromised_users:
self._update_candidate_dst(logins, user, protocol)
compromised_users = set([u for (u, protocol) in compromised_users])
return compromised_users
这个方法在攻击者成功移动到新目标节点后更新其能力集,包括被妥协的凭据和可访问的目标节点。
get_candidate_next_hops
def get_candidate_next_hops(self, attack_history):
"""生成所有可能的移动跳跃。
参数:
attack_history: data_types.AttackHistory 对象
返回:
[AttackNextHop namedtuple 的列表]
"""
candidate_hops = []
candidate_srcs = self._get_candidate_src(attack_history)
self.log("候选跳跃: 候选源 = {}".format(candidate_srcs))
next_hops = flatten_list([
[AttackNextHop(src, dst, user) for src in candidate_srcs]
for (user, dst) in self._get_candidate_user_dst()
])
self.log("候选跳跃: 生成了 {} 个可能的跳跃".format(len(next_hops)))
return next_hops
这个方法生成所有可能的移动跳跃,即攻击者可能的下一步行动。它首先获取候选源节点,然后生成每个用户-目标节点对的所有可能跳跃。
_get_candidate_src
def _get_candidate_src(self, attack_history):
"""辅助方法:获取下一跳的候选源 = 所有访问过的机器。"""
return attack_history.visited_dst
这个方法返回攻击历史中所有访问过的目标节点,作为下一跳的候选源节点。
_get_candidate_user_dst
def _get_candidate_user_dst(self):
"""辅助方法:获取可访问的 (user, dst) 移动对。"""
user_and_dst_pairs = flatten_list([
[(user, dst) for dst in self.dst_per_compromised_user[user]]
for user in self.dst_per_compromised_user
])
self.log("候选跳跃: {} 个候选 (user, dst) 对".format(len(user_and_dst_pairs)))
return user_and_dst_pairs
这个方法获取所有可访问的用户-目标节点对,用于生成候选跳跃。
_get_new_compromised_users
def _get_new_compromised_users(self, logins, time, host):
"""辅助方法:模拟攻击者在主机上妥协凭据。
返回:
set([tuple(user, protocol)])
"""
if host in NON_COMPROMISE_HOSTS or not is_compromisable_host(host):
self.log(
"更新被妥协的凭据:\t主机 = {} 是不可妥协的主机,跳过凭据妥协".format(host))
return set([])
# 确定最近登录到新妥协主机的所有登录
lower_bound = time - datetime.timedelta(hours=self.compromise_cred_hrs)
vuln_logins = logins[
(logins[LoginColumns.DST] == host) &
(logins[LoginColumns.TIME] <= time) &
(logins[LoginColumns.TIME] >= lower_bound)
]
# 确定最近登录到主机的所有用户(可能缓存的凭据)
vuln_users = [
(r[LoginColumns.USER], r[LoginColumns.PROTOCOL]) for idx, r in
vuln_logins[
[LoginColumns.USER, LoginColumns.PROTOCOL]
].drop_duplicates().iterrows()
]
# '妥协'所有真实用户名
compromised_users = set([
(u, protocol) for (u, protocol) in vuln_users if u in self.real_users])
self.log("更新被妥协的凭据:\t"
"主机 = {} 在过去 {} 小时内有 {} 个用户登录。\n"
"妥协的(真实)用户集 = {}".format(
host, len(vuln_users),
self.compromise_cred_hrs, compromised_users
))
return compromised_users
这个方法模拟攻击者在目标主机上妥协凭据。它识别最近登录到目标主机的用户,并返回这些用户和协议的集合。
_update_candidate_dst
def _update_candidate_dst(self, logins, user, protocol=None):
"""辅助方法:更新攻击者可以移动到的目标集。"""
if user in self.dst_per_compromised_user:
# 节省计算:我们可以忽略更新目标集
return
# 确定用户(凭据)以前访问过的目标 = 候选目标
prior_dst = self.get_accessible_dst_for_user(logins, user, protocol=protocol)
# 记录用户的凭据提供访问的目标
self.dst_per_compromised_user[user] = \
prior_dst | self.dst_per_compromised_user.get(user, set([]))
self.log("使用 {} 个目标更新候选目标 "
"(示例: {}) 为被妥协用户 = {}。\n".format(
len(prior_dst), safe_rand_sample(prior_dst, 5), user
))
这个方法更新攻击者可以移动到的目标集。它识别用户以前访问过的目标节点,并将这些节点加入候选目标集中。
总结
AttackerCapabilities
类封装了攻击者在攻击过程中拥有的资源和能力。它包括初始化攻击者的能力、更新妥协凭据和目标集、生成候选跳跃等功能。这些方法使得攻击模拟更加真实和复杂,能够更好地评估网络防御机制的有效性。
1.4 attack_lab架构解析
Python 文件架构分析及函数之间的依赖关系
这个 Python 文件主要用于模拟和合成网络攻击路径,包括生成初始妥协点、生成攻击路径和评估攻击是否成功。
- synthesize_attack 函数:主函数,用于合成攻击路径。
- is_synthetic_attack_successful 函数:判断合成攻击是否成功。
- AttackPathGenerator 类:生成攻击路径的核心类。
- AttackPathConfig 类:封装攻击路径配置。
函数和类之间的依赖关系
synthesize_attack 函数
synthesize_attack
函数是整个攻击合成过程的入口点。它依赖于以下内容:
- AttackPathConfig 类:传入的
attack_config
对象,用于配置攻击路径的各种参数。 - AttackPathGenerator 类:用于生成攻击路径的核心类,实例化后调用其
make_attack
方法生成攻击路径。
is_synthetic_attack_successful 函数
def is_synthetic_attack_successful(attack_df):
"""如果攻击未能切换到新的用户凭据,则攻击不成功。"""
if attack_df is None或len(attack_df) == 0:
return False
users = attack_df[LoginColumns.USER].drop_duplicates()
return len(users) >= 2
is_synthetic_attack_successful
函数用于判断合成的攻击是否成功。它依赖于:
attack_df
:由synthesize_attack
函数生成的攻击路径数据帧。
AttackPathGenerator 类
AttackPathGenerator
类是生成攻击路径的核心类。其主要方法和内部依赖如下:
- 初始化方法:
class AttackPathGenerator(LoggingClass):
"""生成不同类型的横向移动路径。"""
VERSION = 0
def __init__(
self, attack_goal, attacker_knowledge, stealth,
start_state=None, compromise_cred_hrs=None,
active_cred_hrs=None, src_history_hrs=None, src_preference=None,
target_machines=set([]), verbose=True, start_dt=None
):
super(AttackPathGenerator, self).__init__(verbose=verbose)
self.real_users = get_all_users()
self.interarrival_window_hrs = 2
self.start_dt = start_dt
self.attack_start = start_state
self.attack_history = None
self.attack_capabilities = AttackerCapabilities(
attacker_knowledge, compromise_cred_hrs)
self.attack_constraints = MovementConstraints(src_preference)
self.attack_stealth = MovementStealth(
stealth, active_cred_hrs=active_cred_hrs,
src_history_hrs=src_history_hrs)
self.attack_goal = MovementGoal(
attack_goal, target_machines=target_machines)
AttackerCapabilities
:攻击者的能力,决定了可以执行哪些攻击动作。MovementConstraints
:攻击者的约束,决定了攻击路径的生成规则。MovementStealth
:攻击的隐蔽性配置。MovementGoal
:攻击的目标配置。
- make_attack 方法:
def make_attack(self, logins):
"""生成攻击登录事件的主方法。"""
logins = self._preprocess_logins(logins)
self._initialize_start(logins)
while not self.attack_goal.is_attack_complete(self.attack_history):
next_time, next_src, next_dst, next_user = self._get_next_hop(logins)
if next_dst is None:
self.log("\n警告:攻击已失去潜在目标。终止!!!\n")
break
else:
self.log("选择下一个跳跃:"
"(下一跳时间 = {}, 源 = {}, 目标 = {}, 用户 = {}。".format(
next_time, next_src, next_dst, next_user
))
new_hop = LoginSynthesizer().synthesize_login(
logins, next_time, next_src, next_dst, next_user)
new_hop.loc[:, LoginColumns.ATTACK] = True
self._make_next_hop(new_hop, logins)
return self.attack_history.attack_path.reset_index(drop=True)
LoginSynthesizer
:用于合成新的登录事件。_preprocess_logins
、_initialize_start
、_get_next_hop
、_make_next_hop
等内部方法:用于处理登录事件、初始化攻击起点、生成下一次攻击跳跃并更新攻击历史。
AttackPathConfig 类
class AttackPathConfig(LoggingClass):
"""封装攻击路径配置。"""
def __init__(
self, attack_goal, attacker_knowledge, stealth, protocol,
start_state=None,
src_preference=MovementConstraints.SRC_PREF_NONE,
compromise_cred_hrs=AttackerCapabilities.DEFAULT_CRED_EXPOSED_HRS,
active_cred_hrs=MovementStealth.ACTIVE_CRED_HRS,
src_history_hrs=MovementStealth.DEFAULT_SRC_HISTORY_HRS,
target_machines=set([])
):
self.attack_goal = attack_goal
self.attacker_knowledge = attacker_knowledge
self.attack_stealth = stealth
self.protocol = protocol
self.start_state = start_state
self.src_preference = src_preference
self.compromise_cred_hrs = compromise_cred_hrs
self.active_cred_hrs = active_cred_hrs
self.src_history_hrs = src_history_hrs
self.target_machines = target_machines
- 该类用于封装攻击路径配置,包含攻击目标、攻击者知识、隐蔽性、协议、起始状态等。
函数和类的相互依赖关系总结
- synthesize_attack 依赖 AttackPathConfig 来配置攻击路径,并使用 AttackPathGenerator 来生成攻击路径。
- AttackPathGenerator 依赖 AttackerCapabilities、MovementConstraints、MovementStealth 和 MovementGoal 等类来生成攻击路径。
- is_synthetic_attack_successful 依赖 synthesize_attack 生成的攻击路径数据帧来判断攻击是否成功。
实现威胁增强
威胁增强通过以下步骤实现:
- 配置攻击路径:使用
AttackPathConfig
配置攻击路径的目标、攻击者的知识、隐蔽性等参数。 - 生成攻击路径:使用
AttackPathGenerator
生成攻击路径,具体包括初始化起点、生成每个跳跃、更新攻击历史等。 - 合成攻击事件:在每个攻击跳跃中,使用
LoginSynthesizer
合成新的登录事件,模拟攻击者的行为。 - 评估攻击成功与否:使用
is_synthetic_attack_successful
判断生成的攻击路径是否成功。
这种威胁增强方法通过模拟真实攻击路径,生成具有代表性的攻击数据,帮助网络安全研究人员和从业人员评估和改进安全防护机制。
1.5 特点
-
规则和策略:根据预定义的攻击目标、攻击者知识、隐蔽性等策略,模拟真实攻击中的横向移动路径。这些策略和规则帮助生成看似真实的攻击路径。
-
数据处理和过滤:通过对登录数据的预处理和过滤,确保选择的初始妥协点和后续攻击路径符合实际场景中的攻击模式。例如,过滤掉系统管理员、确保用户和机器有足够的历史数据等。
-
随机性:在选择初始妥协点、下一跳攻击目标等过程中引入随机性,以模拟攻击者的不确定行为。这种随机性帮助生成多样化的攻击路径,提高模拟攻击的真实性。
代码实现威胁增强的具体方法
1. 规则和策略
代码通过 AttackPathConfig
类来配置攻击路径的目标、攻击者的知识、隐蔽性等参数,这些参数会影响攻击路径的生成规则和策略。
class AttackPathConfig(LoggingClass):
"""封装攻击路径配置。"""
def __init__(
self, attack_goal, attacker_knowledge, stealth, protocol,
start_state=None,
src_preference=MovementConstraints.SRC_PREF_NONE,
compromise_cred_hrs=AttackerCapabilities.DEFAULT_CRED_EXPOSED_HRS,
active_cred_hrs=MovementStealth.ACTIVE_CRED_HRS,
src_history_hrs=MovementStealth.DEFAULT_SRC_HISTORY_HRS,
target_machines=set([])
):
"""初始化攻击路径生成器。"""
self.attack_goal = attack_goal
self.attacker_knowledge = attacker_knowledge
self.attack_stealth = stealth
self.protocol = protocol
self.start_state = start_state
self.src_preference = src_preference
self.compromise_cred_hrs = compromise_cred_hrs
self.active_cred_hrs = active_cred_hrs
self.src_history_hrs = src_history_hrs
self.target_machines = target_machines
2. 数据处理和过滤
代码通过 AttackPathGenerator
类中的 _preprocess_logins
方法来预处理登录数据,确保选择的初始妥协点和攻击路径符合实际场景中的攻击模式。
def _preprocess_logins(self, logins):
"""辅助方法:预处理登录事件以生成攻击。"""
logins = logins[
(logins[LoginColumns.TIME] >= self.start_dt)
]
# 过滤只包含真实用户的登录事件
logins = logins[logins[LoginColumns.USER].isin(self.real_users)]
# 过滤掉来自忽略集机器的登录事件
# 添加你想应用的其他过滤
return logins
3. 随机性
在选择初始妥协点、下一跳攻击目标等过程中引入随机性,以模拟攻击者的不确定行为。
def _select_start_user_random(self, logins, candidate_start_users=None):
"""选择随机起始用户。"""
# 起始妥协机器应该是客户端
logins = logins[logins[EnrichmentColumns.SRC_CLIENT] == True]
if self.start_time:
logins = logins[
(logins[LoginColumns.TIME] >= self.start_time - datetime.timedelta(hours=24)) &
(logins[LoginColumns.TIME] <= self.start_time + datetime.timedelta(hours=24))
]
# 随机选择一个起始受害者/妥协用户
if not candidate_start_users:
candidate_start_users = self._get_viable_start_users(logins)
self.start_user = random.sample(candidate_start_users, 1)[0]
print("起始用户:随机选择 {} 从 {} 个可行的起始用户中".format(
self.start_user, len(candidate_start_users)
))
在生成下一次攻击跳跃时,通过随机选择时间和目标来模拟攻击者的不确定性。
def _get_next_hop(self, logins):
"""辅助方法:选择下一次攻击跳跃。"""
self.log("生成攻击跳跃 #{}".format(self.attack_history.num_hops))
# 生成下一次攻击跳跃的移动时间
next_interarrival = random.randint(1, 3600 * self.interarrival_window_hrs)
next_time = (
self.attack_history.last_move_time +
datetime.timedelta(seconds=next_interarrival)
)
# 根据攻击者的能力和知识生成下一跳的候选集
candidate_next_hops = self.attack_capabilities.get_candidate_next_hops(
self.attack_history)
# 根据隐蔽性约束/修剪候选下一跳
candidate_next_hops = self.attack_stealth.constrain_next_hops(
candidate_next_hops, self.attack_history)
# 根据领域知识和威胁模型约束/修剪候选下一跳
candidate_next_hops = self.attack_constraints.constrain_next_hops(
candidate_next_hops, self.attack_history)
# 根据攻击目标选择下一跳
next_hop = self.attack_goal.select_next_hop(
candidate_next_hops, self.attack_history, self.attack_capabilities)
return (next_time, next_hop.src, next_hop.dst, next_hop.user)
总结起来,刚才的代码通过结合预定义的规则和策略、对登录数据的预处理和过滤以及引入随机性来实现威胁增强。它模拟了真实攻击中的各种行为和路径,生成看似真实的攻击事件日志,帮助研究人员和从业者更好地理解和防御网络攻击。