banner
NEWS LETTER

Web攻击检测与分类识别

Scroll down

Web攻击检测与分类识别

1. 赛题内容

本赛题总共提供了33219条训练样本,4000条测试样本。训练样本中总共有6类数据标签,分别为0(白标签)、1(SQL注入)、2(目录遍历)、3(远程代码执行)、4(命令执行)、5(XSS跨站脚本)。
其中标签为0的数据代表正常Http报文,类别1到5为不同种类的攻击报文。赛题的目标是建立多分类模型精准识别出6个类别。

2. 解决思路以及代码

(1)数据处理

  • 对于url进行处理,对url进行解码处理,有利于模型更好的理解。提取查询参数的长度,长度的最大值以及标准差。
  • 代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    import re
    from urllib.parse import urlparse, unquote
    import numpy as np

    def get_url_query(s):
    """
    从 URL 字符串中提取查询参数列表。

    Args:
    s (str): URL 字符串。

    Returns:
    list: 提取的查询参数列表。
    """
    # 使用 urlparse 函数解析 URL,然后取出第 4 个部分,即查询参数部分
    li = re.split('[=&]', urlparse(s)[4])
    # 返回奇数位置的元素,即查询参数的值
    return [li[i] for i in range(len(li)) if i % 2 == 1]

    def find_max_str_length(x):
    """
    计算字符串列表中的最大字符串长度。

    Args:
    x (list): 字符串列表。

    Returns:
    int: 最大字符串长度。
    """
    # 计算字符串列表中各个字符串的长度,并取最大值
    return max(len(i) for i in x) if len(x) > 0 else 0

    def find_min_str_length(x):
    """
    计算字符串列表中的最小字符串长度。

    Args:
    x (list): 字符串列表。

    Returns:
    int: 最小字符串长度。
    """
    # 计算字符串列表中各个字符串的长度,并取最小值
    return min(len(i) for i in x) if len(x) > 0 else 0

    def find_str_length_std(x):
    """
    计算字符串列表中的字符串长度的标准差。

    Args:
    x (list): 字符串列表。

    Returns:
    float: 字符串长度的标准差。
    """
    # 计算字符串列表中各个字符串的长度的标准差
    return np.std([len(i) for i in x]) if len(x) > 0 else -1


    # 对 URL 进行解码,并提取查询参数列表
    df['url_unquote'] = df['url'].apply(unquote)
    df['url_query'] = df['url_unquote'].apply(lambda x: get_url_query(x))

    # 计算查询参数列表的长度、最大长度、最小长度和标准差
    df['url_query_num'] = df['url_query'].apply(len)
    df['url_query_max_len'] = df['url_query'].apply(find_max_str_length)
    df['url_query_min_len'] = df['url_query'].apply(find_min_str_length)
    df['url_query_len_std'] = df['url_query'].apply(find_str_length_std)

(2)特征处理

  • tfidf

    tfidf最后得到的是一个稀疏矩阵,稀疏矩阵中的数值是通过 TF-IDF(Term Frequency-Inverse Document Frequency)算法计算得到的。TF-IDF 是一种统计方法,用于衡量一个词语在文档集合中的重要程度。它综合考虑了词频(Term Frequency)和逆文档频率(Inverse Document Frequency)两个因素,
    tf 表示词频(某单词在某文本中的出现次数/该文本中所有词的词数),idf表示逆文本频率(语料库中包含某单词的文本数、的倒数、取log),tf-idf则表示词频 * 逆文档频率,tf-idf认为词的重要性随着它在文本中出现的次数成正比增加,但同时会随着它在整个语料库中出现的频率成反比下降
    稀疏矩阵中的数值是通过 TF-IDF(Term Frequency-Inverse Document Frequency)算法计算得到的。

    具体地说,稀疏矩阵中的每个元素表示一个词语在文档中的 TF-IDF 值。TF-IDF 是一种统计方法,用于衡量一个词语在文档集合中的重要程度。它综合考虑了词频(Term Frequency)和逆文档频率(Inverse Document Frequency)两个因素:

    1. Term Frequency(词频): 表示某个词语在当前文档中出现的频率。TF 的计算公式为:

      $$\text{TF}(t, d) = \frac{f_{t, d}}{\sum_{i \in d} f_{i, d}}$$

      其中,$f_{t, d}$表示词语 $t$在文档$d$中的出现次数,分母部分是当前文档中所有词语的出现次数之和。

    2. Inverse Document Frequency(逆文档频率): 表示包含某个词语的文档数目的倒数的对数。IDF 的计算公式为:
      $$\text{IDF}(t) = \log\left(\frac{N}{df_t}\right)$$

      其中,$N$表示文档集合中文档的总数,$df_t$ 表示包含词语 $t$ 的文档数目。

    3. TF-IDF 值: TF-IDF 值是词频和逆文档频率的乘积,用于衡量词语在当前文档中的重要性。TF-IDF 的计算公式为:

      $$\text{TF-IDF}(t, d) = \text{TF}(t, d) \times \text{IDF}(t)$$

      通过以上计算,稀疏矩阵中的每个元素就得到了相应的 TF-IDF 值,代表了对应词语在当前文档中的重要程度。

  • svd分解

    TFIDF处理之后的维度特征数量太大,需要对特征进行降维,因此,再提取的TFIDF的基础上通过SVD算法(奇异值分解)分别对url、user_agent、body和refer进行降维,降维每个信息段只留存16维特征

    代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.decomposition import TruncatedSVD

    def add_tfidf_feats(df, col, n_components=16):
    """
    对指定列的文本数据进行 TF-IDF 特征提取,并添加到 DataFrame 中。

    Args:
    df (DataFrame): 要添加特征的 DataFrame 对象。
    col (str): 要处理的列名。
    n_components (int): 要提取的主成分数量,默认为 16。

    Returns:
    DataFrame: 添加了 TF-IDF 特征的 DataFrame 对象。
    """
    text = list(df[col].values)
    # 创建 TF-IDF 向量化器
    tf = TfidfVectorizer(min_df=1,
    analyzer='char_wb', # 使用字符级别的特征提取
    ngram_range=(1, 3), # 提取 1 到 3 个字符的 n-gram
    stop_words='english')
    tf.fit(text)
    # 转换文本数据为 TF-IDF 特征向量
    X = tf.transform(text)
    # 使用奇异值分解降维
    svd = TruncatedSVD(n_components=n_components)
    svd.fit(X)
    X_svd = svd.transform(X)
    # 将降维后的特征添加到 DataFrame 中
    for i in range(n_components):
    df[f'{col}_tfidf_{i}'] = X_svd[:, i]
    return df

    def add_tfidf_feats_word(df, col, n_components=16):
    """
    对指定列的文本数据进行 TF-IDF 特征提取(词级别),并添加到 DataFrame 中。

    Args:
    df (DataFrame): 要添加特征的 DataFrame 对象。
    col (str): 要处理的列名。
    n_components (int): 要提取的主成分数量,默认为 16。

    Returns:
    DataFrame: 添加了 TF-IDF 特征的 DataFrame 对象。
    """
    text = list(df[col].values)
    tf = TfidfVectorizer(min_df=1,
    analyzer='word', # 使用词级别的特征提取
    ngram_range=(1, 3), # 提取 1 到 3 个词的 n-gram
    stop_words='english')
    tf.fit(text)
    X = tf.transform(text)
    svd = TruncatedSVD(n_components=n_components)
    svd.fit(X)
    X_svd = svd.transform(X)
    for i in range(n_components):
    df[f'{col}_tfidf_word_{i}'] = X_svd[:, i]
    return df

    def add_tfidf_feats_char(df, col, n_components=16):
    """
    对指定列的文本数据进行 TF-IDF 特征提取(字符级别),并添加到 DataFrame 中。

    Args:
    df (DataFrame): 要添加特征的 DataFrame 对象。
    col (str): 要处理的列名。
    n_components (int): 要提取的主成分数量,默认为 16。

    Returns:
    DataFrame: 添加了 TF-IDF 特征的 DataFrame 对象。
    """
    text = list(df[col].values)
    tf = TfidfVectorizer(min_df=1,
    analyzer='char', # 使用字符级别的特征提取
    ngram_range=(1, 3), # 提取 1 到 3 个字符的 n-gram
    stop_words='english')
    tf.fit(text)
    X = tf.transform(text)
    svd = TruncatedSVD(n_components=n_components)
    svd.fit(X)
    X_svd = svd.transform(X)
    for i in range(n_components):
    df[f'{col}_tfidf_char_{i}'] = X_svd[:, i]
    return df

    # 对 'url_unquote', 'user_agent', 'body' 列进行 TF-IDF 特征提取
    df = add_tfidf_feats(df, 'url_unquote', n_components=16)
    df = add_tfidf_feats(df, 'user_agent', n_components=16)
    df = add_tfidf_feats(df, 'body', n_components=32)

    # 对 'url_unquote', 'user_agent', 'body' 列进行 TF-IDF 特征提取(词级别)
    df = add_tfidf_feats_word(df, 'url_unquote', n_components=16)
    df = add_tfidf_feats_word(df, 'user_agent', n_components=16)
    df = add_tfidf_feats_word(df, 'body', n_components=32)

    # 对 'url_unquote', 'user_agent', 'refer' 列进行 TF-IDF 特征提取(字符级别)
    df = add_tfidf_feats_char(df, 'url_unquote', n_components=16)
    df = add_tfidf_feats_char(df, 'user_agent', n_components=16)
    df = add_tfidf_feats_char(df, 'refer', n_components=32)

  • SMOTE重采样

    模型构建部分,对数据分析部分所发现的数据不平衡问题进行相应处理,采用SMOTE,Synthetic Minority Over-Sampling Technique,合成少数类过采样技术方法。

    SMOTE算法的思想是针对少数类样本,在它的K近邻中随机选择一个样本,并在特征空间中两个样本之间随机选择的点创建合成样本,使得合成样本保证差异性的同时且与原始样本相近,通过SMOTE算法的不平衡处理,使得所有类别的样本数量能够保持一致,便于后面模型的学习。
    alt text
    代码:

    1
    2
    3
    4
    5
    6
    trainX = train[use_features]
    trainY = train['label']
    from imblearn.over_sampling import SMOTE, ADASYN
    oversample = SMOTE()
    trainX, trainY = oversample.fit_resample(trainX, trainY)
    train = pd.concat([trainX, trainY.to_frame(name='label')],axis=1)

(3)模型训练

采用lgb模型,5折训练
代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
NUM_CLASSES = 6
FOLDS = 5
TARGET = 'label'

from sklearn.preprocessing import label_binarize
from lightgbm import log_evaluation, early_stopping

def run_lgb(df_train, df_test, use_features):

target = TARGET
oof_pred = np.zeros((len(df_train), NUM_CLASSES))
y_pred = np.zeros((len(df_test), NUM_CLASSES))

folds = StratifiedKFold(n_splits=FOLDS)
for fold, (tr_ind, val_ind) in enumerate(folds.split(train, train[TARGET])):
print(f'Fold {fold + 1}')
x_train, x_val = df_train[use_features].iloc[tr_ind], df_train[use_features].iloc[val_ind]
y_train, y_val = df_train[target].iloc[tr_ind], df_train[target].iloc[val_ind]
train_set = lgb.Dataset(x_train, y_train)
val_set = lgb.Dataset(x_val, y_val)

params = {
'learning_rate': 0.08,
'metric': 'multiclass',
'objective': 'multiclass',
'num_classes': NUM_CLASSES,
'feature_fraction': 0.75,
'bagging_fraction': 0.75,
'bagging_freq': 2,
'n_jobs': -1,
'seed': 330,
'max_depth': 12,
'num_leaves': 110,
'lambda_l1': 0.5,
'lambda_l2': 0.8,
'verbose': -1
}

model = lgb.train(params,
train_set,
num_boost_round=500,
callbacks = [log_evaluation(period=100), early_stopping(stopping_rounds=80)],
valid_sets=[train_set, val_set]
)
oof_pred[val_ind] = model.predict(x_val)
y_pred += model.predict(df_test[use_features]) / folds.n_splits

print("Features importance...")
gain = model.feature_importance('gain')
feat_imp = pd.DataFrame({'feature': model.feature_name(),
'split': model.feature_importance('split'),
'gain': 100 * gain / gain.sum()}).sort_values('gain', ascending=False)
print('Top 50 features:\n', feat_imp.head(50))

del x_train, x_val, y_train, y_val, train_set, val_set
gc.collect()

return y_pred, oof_pred


y_pred, oof_pred = run_lgb(train, test, use_features)

3. 结果与结论

最终提交结果分数为

alt text

其他文章