视频展示

一、实验准备

身为一个资深前端,半吊子后端,曾经开发过多个各具特色的用户登录和注册系统。下面是对每个系统的简短介绍。本次实验我将基于已有的基础,选取一个登录注册界面使用Python进行重写,更加侧重于数据库层面的深入探讨与理解。

略过吹牛逼环节...

在本次实验中,将选取上述列出的系统3️⃣进行重写,前端VUE代码使用原生三件套进行重写,后端Java代码使用Python进行重写,并且去除框架SpringBoot的使用,更加专注于界面请求与数据库之间的直接操作,同时导致的安全性下降,也更方便从底层探究SQL注入的可能以及防御。

二、前端界面搭建

使用原生三件套进行重写可以避免登陆注册系统太过臃肿,前端代码更加简洁,并且不使用现成的组件库进行通信,从而可以更专注于实验内容。

image-20240523164938896
image-20240523164921083

image-20240523164902898
image-20240523165539795

 <html lang="zh-CN">
 ​
 <head>
     <meta charset="UTF-8">
     <meta name="viewport" content="width=device-width, initial-scale=1.0">
     <title>登陆注册系统</title>
     <link rel="icon" href="https://lovexl.top/upload/头像去背景.png" type="image/x-icon">
 </head>
 ​
 <div class="mainContainer">
     <div class="title1">数据库实验:登陆注册系统</div>
     <div class="title2">作者:郭航江</div>
     <div>
         <form class="form" id="loginForm">
             <ul class="wrapper">
                 <li style="--i:4;"><input class="input" id="username" type="text" placeholder="用户名" required></li>
                 <!-- 确认为必填项 -->
                 <li style="--i:3;"><input class="input" id="password" type="password" placeholder="密码" required></li>
                 <!-- 确认为必填项 -->
                 <li style="--i:2;"><input class="input" id="inviteCode" type="text" placeholder="没有账号?输入邀请码注册"></li>
                 <!-- 邀请码非必填 -->
                 <button style="--i:1;" id="submitButton">登录</button>
             </ul>
         </form>
     </div>
 </div>
 ​
 </html>
 ​
 <script>
     document.getElementById('inviteCode').addEventListener('input', function () {
         var submitButton = document.getElementById('submitButton')
         if (this.value) {
             submitButton.textContent = '注册'
         } else {
             submitButton.textContent = '登录'
         }
     })
 ​
     document.getElementById('loginForm').addEventListener('submit', function (event) {
         event.preventDefault()
 ​
         var username = document.getElementById('username').value
         var password = document.getElementById('password').value
         var inviteCode = document.getElementById('inviteCode').value
         var url = inviteCode ? 'http://localhost:8000/register' : 'http://localhost:8000/login';
 ​
         var data = {
             username: username,
             password: password
         }
 ​
         if (inviteCode) {
             data.inviteCode = inviteCode
         }
 ​
         fetch(url, {
             method: 'POST',
             headers: {
                 'Content-Type': 'application/json'
             },
             body: JSON.stringify(data)
         })
             .then(response => {
                 if (response.status === 200) {
                     alert(inviteCode ? '注册成功' : '登录成功')
                 } else if (response.status === 403) {
                     alert(inviteCode ? '注册失败' : '登录失败')
                 } else {
                     alert('未知错误,请稍后再试')
                 }
                 return response.json()
             })
             .then(data => {
                 console.log('Success:', data)
                 // 根据实际需求处理响应数据
             })
             .catch((error) => {
                 console.error('Error:', error)
                 alert('网络错误,请稍后再试')
             })
     });
 </script>
 ​
 <style>
     body,
     html {
         height: 100%;
         margin: 0;
         display: flex;
         justify-content: center;
         align-items: center;
         background-color: #212121;
     }
 ​
     .input,
     button {
         width: 100%;
         height: 40px;
         position: relative;
         padding: 10px;
         border: 0.1px solid #575cb5;
     }
 ​
     button {
         background: #6d74e3;
         border: none;
     }
 ​
     .wrapper {
         position: relative;
         transform: skewY(-14deg);
     }
 ​
     .wrapper li,
     button {
         position: relative;
         list-style: none;
         width: 200px;
         z-index: var(--i);
         transition: 0.3s;
         color: white;
     }
 ​
     .wrapper li::before,
     button::before {
         position: absolute;
         content: '';
         background: #6d74e3;
         top: 0;
         left: -40px;
         width: 40px;
         height: 40px;
         transform-origin: right;
         transform: skewY(45deg);
         transition: 0.3s;
     }
 ​
     .wrapper li::after,
     button::after {
         position: absolute;
         content: '';
         background: #6d74e3;
         width: 200px;
         height: 40px;
         top: -40px;
         left: 0;
         transform-origin: bottom;
         transform: skewX(45deg);
         transition: 0.3s;
     }
 ​
     .wrapper li:nth-child(1)::after,
     .wrapper li:nth-child(1)::before {
         background-color: #d8daf7;
     }
 ​
     .wrapper li:nth-child(2)::after,
     .wrapper li:nth-child(2)::before {
         background-color: #c2c5f3;
     }
 ​
     .wrapper li:nth-child(3)::after,
     .wrapper li:nth-child(3)::before {
         background-color: #989deb;
     }
 ​
     li .input {
         outline: none;
         border: none;
         color: black;
     }
 ​
     li .input::placeholder {
         color: black;
     }
 ​
     li:nth-child(1) .input {
         background: #d8daf7;
     }
 ​
     li:nth-child(2) .input {
         background: #c2c5f3;
     }
 ​
     li:nth-child(3) .input {
         background: #989deb;
     }
 ​
 ​
     li:nth-child(1) .input:focus {
         outline: none;
         border: 3.5px solid #d8daf7;
     }
 ​
     li:nth-child(2) .input:focus {
         outline: none;
         border: 3.5px solid #c2c5f3;
     }
 ​
     li:nth-child(3) .input:focus {
         outline: none;
         border: 3.5px solid #989deb;
     }
 ​
     .wrapper li:hover,
     button:hover {
         transform: translateX(-20px);
     }
 ​
     button:hover,
     button:hover::before,
     button:hover::after {
         background: #575cb5;
     }
 ​
     button:active {
         transform: translateX(0px);
     }
 ​
     .title1 {
         color: white;
         font-weight: bolder;
         font-size: 16px;
         transform: skewY(-14deg);
     }
 ​
     .title2 {
         color: #7373E2;
         font-weight: bolder;
         margin-bottom: 50px;
         font-size: 14px;
         transform: skewY(-14deg);
     }
 </style>

其中,用于向后端发送请求的代码为:

 document.getElementById('loginForm').addEventListener('submit', function (event) {
         event.preventDefault()
 ​
         var username = document.getElementById('username').value
         var password = document.getElementById('password').value
         var inviteCode = document.getElementById('inviteCode').value
         var url = inviteCode ? 'http://localhost:8000/register' : 'http://localhost:8000/login';
 ​
         var data = {
             username: username,
             password: password
         }
 ​
         if (inviteCode) {
             data.inviteCode = inviteCode
         }
 ​
         fetch(url, {
             method: 'POST',
             headers: {
                 'Content-Type': 'application/json'
             },
             body: JSON.stringify(data)
         })
             .then(response => {
                 if (response.status === 200) {
                     alert(inviteCode ? '注册成功' : '登录成功')
                 } else if (response.status === 403) {
                     alert(inviteCode ? '注册失败,邀请码错误' : '登录失败')
                 } else {
                     alert('未知错误,请稍后再试')
                 }
                 return response.json()
             })
             .then(data => {
                 console.log('Success:', data)
                 // 根据实际需求处理响应数据
             })
             .catch((error) => {
                 console.error('Error:', error)
                 alert('网络错误,请稍后再试')
             })
     });

三、后端以及数据库搭建

 import http.server  # 导入HTTP服务器模块
 import socketserver  # 导入套接字服务器模块
 import json  # 导入JSON模块,用于处理JSON数据
 from urllib.parse import urlparse  # 导入URL解析模块
 import pymysql  # 导入pymysql模块,用于连接和操作MySQL数据库
 ​
 PORT = 8000  # 设置服务器监听的端口号
 INVITE_CODE = "114514"  # 设置注册时需要的邀请码
 ​
 # 初始化数据库和表
 def initialize_database():
     try:
         # 连接MySQL数据库
         con = pymysql.connect(
             host="localhost",  # 数据库主机名
             port=3306,  # 数据库端口号
             user="root",  # 数据库用户名
             password="******",  # 数据库密码
             database="LoginRegistrationSystem"  # 数据库名称
         )
         cursor = con.cursor()  # 创建游标对象
         # 创建表格,如果不存在则创建
         cursor.execute("""
             CREATE TABLE IF NOT EXISTS users (
                 id INT AUTO_INCREMENT PRIMARY KEY,
                 username VARCHAR(50) UNIQUE NOT NULL,
                 password VARCHAR(50) NOT NULL,
                 invite_code VARCHAR(20)
             )
         """)
         con.commit()  # 提交数据库操作
     except Exception as e:
         print("Exception during database initialization:", e)  # 打印异常信息
     finally:
         cursor.close()  # 关闭游标
         con.close()  # 关闭数据库连接
 ​
 initialize_database()  # 调用初始化数据库和表的函数
 ​
 class RequestHandler(http.server.SimpleHTTPRequestHandler):
     def do_POST(self):
         content_length = int(self.headers['Content-Length'])  # 获取请求内容的长度
         post_data = self.rfile.read(content_length)  # 读取请求内容
         data = json.loads(post_data)  # 将请求内容解析为JSON格式
 ​
         parsed_path = urlparse(self.path)  # 解析请求路径
         if parsed_path.path == '/login':
             self.handle_login(data)  # 如果路径是/login,则处理登录请求
         elif parsed_path.path == '/register':
             self.handle_register(data)  # 如果路径是/register,则处理注册请求
         else:
             self.send_response(404)  # 如果路径不匹配,返回404错误
             self.end_headers()
             self.wfile.write(b'Not Found')  # 返回"Not Found"信息
 ​
     def handle_login(self, data):
         username = data['username']  # 获取用户名
         password = data['password']  # 获取密码
 ​
         try:
             # 连接MySQL数据库
             con = pymysql.connect(
                 host="localhost",
                 port=3306,
                 user="root",
                 password="******",
                 database="LoginRegistrationSystem"
             )
             cursor = con.cursor()  # 创建游标对象
             # 查询用户名和密码是否匹配
             cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password))
             result = cursor.fetchone()  # 获取查询结果
             if result:
                 self.send_response(200)  # 如果匹配,返回200状态码
                 self.end_headers()
                 self.wfile.write(b'Login Successful')  # 返回"Login Successful"信息
             else:
                 self.send_response(403)  # 如果不匹配,返回403状态码
                 self.end_headers()
                 self.wfile.write(b'Login Failed')  # 返回"Login Failed"信息
         except Exception as e:
             self.send_response(500)  # 如果发生异常,返回500状态码
             self.end_headers()
             self.wfile.write(b'Internal Server Error')  # 返回"Internal Server Error"信息
             print("Exception:", e)  # 打印异常信息
         finally:
             cursor.close()  # 关闭游标
             con.close()  # 关闭数据库连接
 ​
     def handle_register(self, data):
         username = data['username']  # 获取用户名
         password = data['password']  # 获取密码
         invite_code = data.get('inviteCode')  # 获取邀请码
 ​
         if invite_code != INVITE_CODE:  # 检查邀请码是否正确
             self.send_response(403)  # 如果邀请码不正确,返回403状态码
             self.end_headers()
             self.wfile.write(b'Invalid Invite Code')  # 返回"Invalid Invite Code"信息
             return
 ​
         try:
             # 连接MySQL数据库
             con = pymysql.connect(
                 host="localhost",
                 port=3306,
                 user="root",
                 password="******",
                 database="LoginRegistrationSystem"
             )
             cursor = con.cursor()  # 创建游标对象
             # 查询用户名是否已存在
             cursor.execute("SELECT * FROM users WHERE username=%s", (username,))
             result = cursor.fetchone()  # 获取查询结果
             if result:
                 self.send_response(403)  # 如果用户名已存在,返回403状态码
                 self.end_headers()
                 self.wfile.write(b'Username Already Exists')  # 返回"Username Already Exists"信息
             else:
                 # 插入新的用户记录
                 cursor.execute("INSERT INTO users (username, password, invite_code) VALUES (%s, %s, %s)", (username, password, invite_code))
                 con.commit()  # 提交数据库操作
                 self.send_response(200)  # 返回200状态码
                 self.end_headers()
                 self.wfile.write(b'Registration Successful')  # 返回"Registration Successful"信息
         except Exception as e:
             self.send_response(500)  # 如果发生异常,返回500状态码
             self.end_headers()
             self.wfile.write(b'Internal Server Error')  # 返回"Internal Server Error"信息
             print("Exception:", e)  # 打印异常信息
         finally:
             cursor.close()  # 关闭游标
             con.close()  # 关闭数据库连接
 ​
 Handler = RequestHandler  # 设置请求处理器
 ​
 with socketserver.TCPServer(("", PORT), Handler) as httpd:
     print("serving at port", PORT)  # 打印服务器正在监听的端口信息
     httpd.serve_forever()  # 启动服务器,保持运行

产生的问题1

问题描述:在测试运行时,会一直报错501,系统无法正常运行。

问题解决:浏览器在发送CORS(跨域资源共享)请求时,会首先发送一个OPTIONS请求,以检查服务器是否允许跨域请求。当前的Python服务器未处理OPTIONS请求,因此返回501错误(不支持的方法)。因此我们需要在后端代码中加入对于OPTIONS请求的处理,并正确响应跨域请求。

以下是改进后在请求处理器类中添加的代码:

 # 处理OPTIONS请求,用于跨域请求预检
     def do_OPTIONS(self):
         self.send_response(200, "ok")  # 响应200状态码
         self.send_header('Access-Control-Allow-Origin', '*')  # 设置允许所有域名访问
         self.send_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')  # 设置允许的方法
         self.send_header("Access-Control-Allow-Headers", "Content-Type")  # 设置允许的请求头
         self.end_headers()  # 结束响应

产生的问题2

问题描述:能够进行正常登录与注册,但是注册完毕后还会弹出”网络错误,请稍后再试“。

问题解决:前端代码在执行.then(response => ...)之后,会尝试调用response.json()。我们的Python服务器返回的不是有效的JSON响应,response.json()会抛出一个错误,这会导致.catch块中的错误处理程序被触发,显示“网络错误,请稍后再试”。

所以我们需要对后端Python代码进行改进,使其返回JSON响应,以下便是我们的最终版本

 import http.server  # 导入处理HTTP请求的模块
 import socketserver  # 导入处理TCP连接的模块
 import json  # 导入处理JSON数据的模块
 from urllib.parse import urlparse  # 导入解析URL的模块
 import pymysql  # 导入处理MySQL数据库的模块
 ​
 PORT = 8000  # 定义服务器监听的端口
 INVITE_CODE = "114514"  # 定义注册所需的邀请码
 ​
 # 初始化数据库和表
 def initialize_database():
     try:
         con = pymysql.connect(
             host="localhost",  # 数据库主机地址
             port=3306,  # 数据库端口
             user="root",  # 数据库用户名
             password="******",  # 数据库密码
             database="LoginRegistrationSystem"  # 数据库名称
         )
         cursor = con.cursor()  # 创建一个游标对象
         # 创建用户表,如果不存在
         cursor.execute("""
             CREATE TABLE IF NOT EXISTS users (
                 id INT AUTO_INCREMENT PRIMARY KEY,
                 username VARCHAR(50) UNIQUE NOT NULL,
                 password VARCHAR(50) NOT NULL,
                 invite_code VARCHAR(20)
             )
         """)
         con.commit()  # 提交数据库操作
     except Exception as e:
         print("Exception during database initialization:", e)  # 捕获并打印异常
     finally:
         cursor.close()  # 关闭游标
         con.close()  # 关闭数据库连接
 ​
 initialize_database()  # 调用初始化函数
 ​
 # 定义请求处理类
 class RequestHandler(http.server.SimpleHTTPRequestHandler):
     # 处理OPTIONS请求,用于跨域请求预检
     def do_OPTIONS(self):
         self.send_response(200, "ok")
         self.send_header('Access-Control-Allow-Origin', '*')
         self.send_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')
         self.send_header("Access-Control-Allow-Headers", "Content-Type")
         self.end_headers()
 ​
     # 处理POST请求
     def do_POST(self):
         content_length = int(self.headers['Content-Length'])  # 获取请求体长度
         post_data = self.rfile.read(content_length)  # 读取请求体
         data = json.loads(post_data)  # 解析JSON数据
 ​
         parsed_path = urlparse(self.path)  # 解析请求路径
         if parsed_path.path == '/login':  # 如果路径是/login,处理登录请求
             self.handle_login(data)
         elif parsed_path.path == '/register':  # 如果路径是/register,处理注册请求
             self.handle_register(data)
         else:
             self.send_response(404)  # 路径未找到
             self.send_header('Content-Type', 'application/json')
             self.send_header('Access-Control-Allow-Origin', '*')
             self.end_headers()
             self.wfile.write(json.dumps({'message': 'Not Found'}).encode('utf-8'))
 ​
     # 处理登录请求
     def handle_login(self, data):
         username = data['username']  # 获取用户名
         password = data['password']  # 获取密码
 ​
         try:
             con = pymysql.connect(
                 host="localhost",
                 port=3306,
                 user="root",
                 password="******",
                 database="LoginRegistrationSystem"
             )
             cursor = con.cursor()
             cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password))  # 查询用户
             result = cursor.fetchone()  # 获取查询结果
             if result:
                 self.send_response(200)  # 登录成功
                 self.send_header('Content-Type', 'application/json')
                 self.send_header('Access-Control-Allow-Origin', '*')
                 self.end_headers()
                 self.wfile.write(json.dumps({'message': 'Login Successful'}).encode('utf-8'))
             else:
                 self.send_response(403)  # 登录失败
                 self.send_header('Content-Type', 'application/json')
                 self.send_header('Access-Control-Allow-Origin', '*')
                 self.end_headers()
                 self.wfile.write(json.dumps({'message': 'Login Failed'}).encode('utf-8'))
         except Exception as e:
             self.send_response(500)  # 内部服务器错误
             self.send_header('Content-Type', 'application/json')
             self.send_header('Access-Control-Allow-Origin', '*')
             self.end_headers()
             self.wfile.write(json.dumps({'message': 'Internal Server Error'}).encode('utf-8'))
             print("Exception:", e)  # 打印异常信息
         finally:
             cursor.close()  # 关闭游标
             con.close()  # 关闭数据库连接
 ​
     # 处理注册请求
     def handle_register(self, data):
         username = data['username']  # 获取用户名
         password = data['password']  # 获取密码
         invite_code = data.get('inviteCode')  # 获取邀请码
 ​
         if invite_code != INVITE_CODE:  # 验证邀请码
             self.send_response(403)  # 邀请码无效
             self.send_header('Content-Type', 'application/json')
             self.send_header('Access-Control-Allow-Origin', '*')
             self.end_headers()
             self.wfile.write(json.dumps({'message': 'Invalid Invite Code'}).encode('utf-8'))
             return
 ​
         try:
             con = pymysql.connect(
                 host="localhost",
                 port=3306,
                 user="root",
                 password="******",
                 database="LoginRegistrationSystem"
             )
             cursor = con.cursor()
             cursor.execute("SELECT * FROM users WHERE username=%s", (username,))  # 检查用户名是否存在
             result = cursor.fetchone()
             if result:
                 self.send_response(403)  # 用户名已存在
                 self.send_header('Content-Type', 'application/json')
                 self.send_header('Access-Control-Allow-Origin', '*')
                 self.end_headers()
                 self.wfile.write(json.dumps({'message': 'Username Already Exists'}).encode('utf-8'))
             else:
                 # 插入新用户记录
                 cursor.execute("INSERT INTO users (username, password, invite_code) VALUES (%s, %s, %s)", (username, password, invite_code))
                 con.commit()  # 提交数据库操作
                 self.send_response(200)  # 注册成功
                 self.send_header('Content-Type', 'application/json')
                 self.send_header('Access-Control-Allow-Origin', '*')
                 self.end_headers()
                 self.wfile.write(json.dumps({'message': 'Registration Successful'}).encode('utf-8'))
         except Exception as e:
             self.send_response(500)  # 内部服务器错误
             self.send_header('Content-Type', 'application/json')
             self.send_header('Access-Control-Allow-Origin', '*')
             self.end_headers()
             self.wfile.write(json.dumps({'message': 'Internal Server Error'}).encode('utf-8'))
             print("Exception:", e)  # 打印异常信息
         finally:
             cursor.close()  # 关闭游标
             con.close()  # 关闭数据库连接
 ​
 Handler = RequestHandler  # 创建请求处理实例
 ​
 # 创建服务器并启动
 with socketserver.TCPServer(("", PORT), Handler) as httpd:
     print("serving at port", PORT)  # 打印服务器启动信息
     httpd.serve_forever()  # 启动服务器

四、完整系统展示

视频展示见文章开头

1.登录功能

启动后端,监听8000端口,截图如下:

image-20240525161729277

已有数据库截图如下:

image-20240525161801458

登录其中的test1,测试登录功能:

image-20240525161829812

登录数据库中没有的test3,测试如下:

image-20240525161900287

2.注册功能

添加了邀请码机制,只有填入正确的邀请码114514才可以注册成功。

image-20240525162016399

image-20240525162031676

注册成功后查看数据库,发现test3的用户数据成功添加:

image-20240525162106842

五、执行SQL注入攻击

首先我们分析关键后端代码:

  • 处理登录代码:

     # 处理登录请求
         def handle_login(self, data):
             username = data['username']  # 获取用户名
             password = data['password']  # 获取密码
     ​
             try:
                 con = pymysql.connect(
                     host="localhost",
                     port=3306,
                     user="root",
                     password="******",
                     database="LoginRegistrationSystem"
                 )
                 cursor = con.cursor()
                 cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password))  # 查询用户
                 result = cursor.fetchone()  # 获取查询结果
                 if result:
                     self.send_response(200)  # 登录成功
                     self.send_header('Content-Type', 'application/json')
                     self.send_header('Access-Control-Allow-Origin', '*')
                     self.end_headers()
                     self.wfile.write(json.dumps({'message': 'Login Successful'}).encode('utf-8'))
                 else:
                     self.send_response(403)  # 登录失败
                     self.send_header('Content-Type', 'application/json')
                     self.send_header('Access-Control-Allow-Origin', '*')
                     self.end_headers()
                     self.wfile.write(json.dumps({'message': 'Login Failed'}).encode('utf-8'))
             except Exception as e:
                 self.send_response(500)  # 内部服务器错误
                 self.send_header('Content-Type', 'application/json')
                 self.send_header('Access-Control-Allow-Origin', '*')
                 self.end_headers()
                 self.wfile.write(json.dumps({'message': 'Internal Server Error'}).encode('utf-8'))
                 print("Exception:", e)  # 打印异常信息
             finally:
                 cursor.close()  # 关闭游标
                 con.close()  # 关闭数据库连接
  • 处理注册代码:

     INVITE_CODE = "114514"  # 定义注册所需的邀请码
     # 处理注册请求
         def handle_register(self, data):
             username = data['username']  # 获取用户名
             password = data['password']  # 获取密码
             invite_code = data.get('inviteCode')  # 获取邀请码
     ​
             if invite_code != INVITE_CODE:  # 验证邀请码
                 self.send_response(403)  # 邀请码无效
                 self.send_header('Content-Type', 'application/json')
                 self.send_header('Access-Control-Allow-Origin', '*')
                 self.end_headers()
                 self.wfile.write(json.dumps({'message': 'Invalid Invite Code'}).encode('utf-8'))
                 return
     ​
             try:
                 con = pymysql.connect(
                     host="localhost",
                     port=3306,
                     user="root",
                     password="******",
                     database="LoginRegistrationSystem"
                 )
                 cursor = con.cursor()
                 cursor.execute("SELECT * FROM users WHERE username=%s", (username,))  # 检查用户名是否存在
                 result = cursor.fetchone()
                 if result:
                     self.send_response(403)  # 用户名已存在
                     self.send_header('Content-Type', 'application/json')
                     self.send_header('Access-Control-Allow-Origin', '*')
                     self.end_headers()
                     self.wfile.write(json.dumps({'message': 'Username Already Exists'}).encode('utf-8'))
                 else:
                     # 插入新用户记录
                     cursor.execute("INSERT INTO users (username, password, invite_code) VALUES (%s, %s, %s)", (username, password, invite_code))
                     con.commit()  # 提交数据库操作
                     self.send_response(200)  # 注册成功
                     self.send_header('Content-Type', 'application/json')
                     self.send_header('Access-Control-Allow-Origin', '*')
                     self.end_headers()
                     self.wfile.write(json.dumps({'message': 'Registration Successful'}).encode('utf-8'))
             except Exception as e:
                 self.send_response(500)  # 内部服务器错误
                 self.send_header('Content-Type', 'application/json')
                 self.send_header('Access-Control-Allow-Origin', '*')
                 self.end_headers()
                 self.wfile.write(json.dumps({'message': 'Internal Server Error'}).encode('utf-8'))
                 print("Exception:", e)  # 打印异常信息
             finally:
                 cursor.close()  # 关闭游标
                 con.close()  # 关闭数据库连接

我们可以先执行一些简单的SQL注入指令如下:

1.使用单引号关闭字符串

  • 输入:

    • 用户名: test1' --

    • 密码: anything

  • 这里的 -- 是 SQL 中的注释符号,意味着它后面的部分会被忽略。这将导致查询变为:

     SELECT * FROM users WHERE username='admin' -- ' AND password='anything'

image-20240525164405900

2.使用 OR 1=1 始终为真

  • 输入:

    • 用户名: test1' OR '1'='1

    • 密码: anything

  • 解释: 这将导致查询变为:

    SELECT * FROM users WHERE username='admin' OR '1'='1' AND password='anything'

image-20240525164438216

经测试,发现常规的SQL注入都不管用,原因是我在代码中使用了预处理语句,例如:

cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password))

他可以可以成功避免 SQL 注入攻击的原因在于它们将 SQL 查询和数据分开处理,确保数据不会被当作代码执行。%s 是占位符,usernamepassword 是参数。无论用户输入什么,输入都会被视为数据,不会作为 SQL 代码执行。

六、系统安全性增强

1.使用哈希和盐存储密码

正如之前展示的一样,现在系统中密码是以明文形式存储的,这是不安全的,可以使用哈希和盐来存储密码。

这里我使用 bcrypt 库来实现这一点。

import bcrypt

# 哈希密码
def hash_password(password):
    salt = bcrypt.gensalt()
    hashed_password = bcrypt.hashpw(password.encode('utf-8'), salt)
    return hashed_password

# 验证密码
def check_password(hashed_password, user_password):
    return bcrypt.checkpw(user_password.encode('utf-8'), hashed_password)

修改 handle_register 方法来存储哈希后的密码:

def handle_register(self, data):
    username = data['username']
    password = data['password']
    invite_code = data.get('inviteCode')

    if invite_code != INVITE_CODE:
        self.send_response(403)
        self.send_header('Content-Type', 'application/json')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.end_headers()
        self.wfile.write(json.dumps({'message': 'Invalid Invite Code'}).encode('utf-8'))
        return

    try:
        con = pymysql.connect(
            host="localhost",
            port=3306,
            user="root",
            password="******",
            database="LoginRegistrationSystem"
        )
        cursor = con.cursor()
        cursor.execute("SELECT * FROM users WHERE username=%s", (username,))
        result = cursor.fetchone()
        if result:
            self.send_response(403)
            self.send_header('Content-Type', 'application/json')
            self.send_header('Access-Control-Allow-Origin', '*')
            self.end_headers()
            self.wfile.write(json.dumps({'message': 'Username Already Exists'}).encode('utf-8'))
        else:
            hashed_password = hash_password(password)
            cursor.execute("INSERT INTO users (username, password, invite_code) VALUES (%s, %s, %s)", (username, hashed_password, invite_code))
            con.commit()
            self.send_response(200)
            self.send_header('Content-Type', 'application/json')
            self.send_header('Access-Control-Allow-Origin', '*')
            self.end_headers()
            self.wfile.write(json.dumps({'message': 'Registration Successful'}).encode('utf-8'))
    except Exception as e:
        self.send_response(500)
        self.send_header('Content-Type', 'application/json')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.end_headers()
        self.wfile.write(json.dumps({'message': 'Internal Server Error'}).encode('utf-8'))
        print("Exception:", e)
    finally:
        cursor.close()
        con.close()

修改 handle_login 方法来验证哈希后的密码:

 def handle_login(self, data):
     username = data['username']
     password = data['password']
 ​
     try:
         con = pymysql.connect(
             host="localhost",
             port=3306,
             user="root",
             password="******",
             database="LoginRegistrationSystem"
         )
         cursor = con.cursor()
         cursor.execute("SELECT password FROM users WHERE username=%s", (username,))
         result = cursor.fetchone()
         if result and check_password(result[0].encode('utf-8'), password):
             self.send_response(200)
             self.send_header('Content-Type', 'application/json')
             self.send_header('Access-Control-Allow-Origin', '*')
             self.end_headers()
             self.wfile.write(json.dumps({'message': 'Login Successful'}).encode('utf-8'))
         else:
             self.send_response(403)
             self.send_header('Content-Type', 'application/json')
             self.send_header('Access-Control-Allow-Origin', '*')
             self.end_headers()
             self.wfile.write(json.dumps({'message': 'Login Failed'}).encode('utf-8'))
     except Exception as e:
         self.send_response(500)
         self.send_header('Content-Type', 'application/json')
         self.send_header('Access-Control-Allow-Origin', '*')
         self.end_headers()
         self.wfile.write(json.dumps({'message': 'Internal Server Error'}).encode('utf-8'))
         print("Exception:", e)
     finally:
         cursor.close()
         con.close()

2.HTTPS

这个就显而易见了,本地测试也都走的是HTTP,想要更安全可以使用HTTPS加密传输数据。

3.输入验证,防止弱密码

为了在前后端代码中都添加对于设置密码的验证,要求密码至少包含10位字符,并且包含数字与字母,我们需要对前端的表单进行验证,并在后端再次进行验证以确保安全。

前端代码修改

 if (inviteCode && !isValidPassword(password)) {
             alert('密码至少包含10位字符,并且包含数字与字母')
             return
         }
 ​
 function isValidPassword(password) {
         const minLength = 10
         const hasNumber = /\d/.test(password)
         const hasLetter = /[a-zA-Z]/.test(password)
         return password.length >= minLength && hasNumber && hasLetter
     }

后端代码修改

     def is_valid_password(self, password):
         if len(password) < 10:
             return False
         if not re.search(r'\d', password):
             return False
         if not re.search(r'[a-zA-Z]', password):
             return False
         return True
     
     if not self.is_valid_password(password):
             self.send_response(400)
             self.send_header('Content-Type', 'application/json')
             self.send_header('Access-Control-Allow-Origin', '*')
             self.end_headers()
             self.wfile.write(json.dumps({'message': 'Password must be at least 10 characters long and contain both letters and numbers'}).encode('utf-8'))
             return