import requests import time import json import random from PIL import Image from io import BytesIO import base64 try: import ddddocr OCR_AVAILABLE = True except ImportError: OCR_AVAILABLE = False print("警告: 未安装ddddocr库,验证码识别功能将不可用") print("请运行: pip install ddddocr")
try: from captcha_recognizer.slider import Slider CAPTCHA_RECOGNIZER_AVAILABLE = True print("✓ captcha-recognizer 已加载,支持自动识别滑块缺口") except ImportError: CAPTCHA_RECOGNIZER_AVAILABLE = False print("⚠ 未安装captcha-recognizer,将使用手动方式识别缺口") print(" 安装命令: pip install captcha-recognizer")
session = requests.Session()
USE_PROXY = False PROXY_CONFIG = { "http": "http://127.0.0.1:8083", "https": "http://127.0.0.1:8083", }
if USE_PROXY: session.proxies.update(PROXY_CONFIG) session.verify = False from urllib3.exceptions import InsecureRequestWarning import warnings warnings.filterwarnings('ignore', category=InsecureRequestWarning) print(f"✓ HTTP代理已启用: {PROXY_CONFIG['http']}") print(" 提示: 请确保代理工具正在运行")
BASE_URL = "https://修改为学生学习系统地址"
USERNAME = "xxxxxx" PASSWORD = "xxxxxx"
edu_token = None edu_sign = None edu_cid = None user_account_id = None
EDU_SIGN_CONFIG = None
def create_base_headers(content_type=None, extra_headers=None): """ 创建基础请求头 参数: content_type: Content-Type,如 'application/json' 或 'application/x-www-form-urlencoded' extra_headers: 额外的header字典 返回: 完整的headers字典 """ headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36 Edg/146.0.0.0", "X-Requested-With": "XMLHttpRequest", "Origin": BASE_URL, "Referer": f"{BASE_URL}/", } if content_type: headers["Content-Type"] = content_type if extra_headers: headers.update(extra_headers) return headers
def create_api_headers(content_type="application/json", extra_headers=None): """ 创建API请求头(包含eduToken等认证信息) 参数: content_type: Content-Type extra_headers: 额外的header字典 返回: 完整的headers字典 """ headers = create_base_headers(content_type=content_type, extra_headers=extra_headers) if user_account_id: headers["eduUId"] = user_account_id if edu_token: headers["eduToken"] = edu_token if edu_cid: headers["eduCId"] = edu_cid if EDU_SIGN_CONFIG: headers["eduSign"] = EDU_SIGN_CONFIG elif edu_sign: headers["eduSign"] = edu_sign return headers
def get_verify_code(): """获取验证码图片和VERIFYCODE cookie""" timestamp = int(time.time() * 1000) verify_url = f"{BASE_URL}/verifycode?{timestamp}" headers = create_base_headers() response = session.get(verify_url, headers=headers) if response.status_code == 200: with open("verify_code.png", "wb") as f: f.write(response.content) print("验证码图片已保存到 verify_code.png") if OCR_AVAILABLE: try: ocr = ddddocr.DdddOcr(show_ad=False) code = ocr.classification(response.content) print(f"自动识别验证码: {code}") return code except Exception as e: print(f"自动识别失败: {e}")
else: raise Exception(f"获取验证码失败: {response.status_code}")
def handle_slider_captcha(): """ 处理滑块验证码 返回验证成功后的captcha id """ print("\n检测到滑块验证码,开始处理...") import datetime start_sliding_time = datetime.datetime.now(datetime.timezone.utc) gen_url = f"{BASE_URL}/student/dispatch/captcha/gen?type=RANDOM" headers = create_base_headers(content_type="application/json;charset=UTF-8") response = session.post(gen_url, headers=headers, json={}) if response.status_code != 200: print(f"✗ 获取滑块验证码失败: {response.status_code}") return None captcha_data = response.json() captcha_id = captcha_data.get('id') captcha_info = captcha_data.get('captcha', {}) if not captcha_id or captcha_info.get('type') != 'SLIDER': print("✗ 不是滑块验证码类型") return None print(f"✓ 获取到滑块验证码 ID: {captcha_id}") bg_image_base64 = captcha_info.get('backgroundImage', '') if not bg_image_base64: print("✗ 未找到背景图片") return None if ',' in bg_image_base64: bg_image_base64 = bg_image_base64.split(',')[1] try: bg_image_data = base64.b64decode(bg_image_base64) with open("slider_captcha_bg.png", "wb") as f: f.write(bg_image_data) print("✓ 背景图片已保存到 slider_captcha_bg.png") bg_image = Image.open(BytesIO(bg_image_data)) bg_width, bg_height = bg_image.size print(f" 图片尺寸: {bg_width}x{bg_height}") except Exception as e: print(f"✗ 解析图片失败: {e}") return None if not CAPTCHA_RECOGNIZER_AVAILABLE: print("\n✗ 错误: 未安装captcha-recognizer库,无法自动识别缺口") print("请运行: pip install captcha-recognizer") return None slider_offset = None print("\n使用captcha-recognizer自动识别缺口位置...") try: target_width = 300 original_img = Image.open("slider_captcha_bg.png") original_width, original_height = original_img.size scale_ratio = target_width / original_width target_height = int(original_height * scale_ratio) scaled_img = original_img.resize((target_width, target_height), Image.Resampling.LANCZOS) scaled_path = "slider_captcha_scaled.png" scaled_img.save(scaled_path) print(f" 图片已缩放: {original_width}x{original_height} -> {target_width}x{target_height} (比例: {scale_ratio:.2f})") slider = Slider() result = slider.identify(scaled_path) if result: box, confidence = result if confidence > 0.5: gap_x_scaled = box[0] print(f"✓ 自动识别成功!缺口坐标(缩放图): {box}, 可信度: {confidence:.4f}") gap_x_original = int(gap_x_scaled / scale_ratio) slider_offset = int(gap_x_scaled) - 5 print(f"✓ 坐标转换: {gap_x_scaled:.2f}px (缩放图) -> {gap_x_original}px (原始图) -> {slider_offset}px (验证用)") else: print(f"✗ 自动识别可信度过低 ({confidence:.4f} < 0.5),验证失败") return None else: print("✗ 自动识别失败,未返回结果") return None except Exception as e: print(f"✗ 自动识别失败: {e}") import traceback traceback.print_exc() return None print(f"✓ 确定缺口位置: {slider_offset}px") track_list = generate_slider_track(slider_offset) check_url = f"{BASE_URL}/student/dispatch/captcha/check" track_total_duration = track_list[-1]['t'] if track_list else 2000 end_sliding_time = start_sliding_time + datetime.timedelta(milliseconds=track_total_duration) start_time_str = start_sliding_time.strftime("%Y-%m-%dT%H:%M:%S.") + f"{start_sliding_time.microsecond // 1000:03d}Z" end_time_str = end_sliding_time.strftime("%Y-%m-%dT%H:%M:%S.") + f"{end_sliding_time.microsecond // 1000:03d}Z" check_headers = create_base_headers( content_type="application/json;charset=UTF-8", extra_headers={ "sec-ch-ua-platform": "\"Windows\"", "Accept": "application/json, text/javascript, */*; q=0.01", "sec-ch-ua": "\"Chromium\";v=\"146\", \"Not-A.Brand\";v=\"24\", \"Microsoft Edge\";v=\"146\"", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", } ) scaled_width = 300 scaled_height = 180 check_data = { "id": captcha_id, "data": { "bgImageWidth": scaled_width, "bgImageHeight": scaled_height, "sliderImageWidth": 55, "sliderImageHeight": scaled_height, "startSlidingTime": start_time_str, "endSlidingTime": end_time_str, "trackList": track_list } } print("正在提交验证...") print(f"调试信息: 缩放图片尺寸={scaled_width}x{scaled_height}, 原始尺寸={bg_width}x{bg_height}, 偏移量={slider_offset}, 轨迹点数={len(track_list)}") print(f"时间范围: {start_time_str} -> {end_time_str}") print(f"轨迹时间范围: 0 -> {track_list[-1]['t']}ms") import json as json_module debug_data = { "id": captcha_id, "data": { "bgImageWidth": scaled_width, "bgImageHeight": scaled_height, "sliderImageWidth": 55, "sliderImageHeight": scaled_height, "startSlidingTime": start_time_str, "endSlidingTime": end_time_str, "trackList": track_list[:3] + ["..."] + track_list[-2:] } } print(f"请求数据: {json_module.dumps(debug_data, ensure_ascii=False, indent=2)[:500]}") response = session.post(check_url, headers=check_headers, json=check_data) if response.status_code != 200: print(f"✗ 验证请求失败: {response.status_code}") print(f"响应内容: {response.text[:500]}") print(f"\n提示: 400错误通常是数据格式问题,请检查:") print(f" 1. 图片尺寸是否正确 (缩放: {scaled_width}x{scaled_height}, 原始: {bg_width}x{bg_height})") print(f" 2. 滑块宽度是否为55") print(f" 3. 时间格式是否为ISO 8601") print(f" 4. 轨迹点的t字段是否为相对时间") print(f" 5. Header是否完整") return None result = response.json() if result.get('success') and result.get('code') == 200: new_captcha_id = result.get('data', {}).get('id') print(f"✓ 滑块验证成功!新的captcha ID: {new_captcha_id}") return new_captcha_id else: print(f"✗ 验证失败: {result.get('msg', '未知错误')}") return None
def generate_slider_track(target_offset): """ 生成拟人化的滑动轨迹(总时长约2秒,时间分布更线性) 参数: target_offset: 目标偏移量(像素) 返回: 轨迹点列表(t为相对时间,从0开始) """ track_list = [] current_x = 0 current_y = 0 elapsed_time = 0 track_list.append({ "x": 0, "y": 0, "type": "down", "t": 0 }) steps = random.randint(25, 35) remaining = target_offset for i in range(steps): if i < steps * 0.7: step_distance = remaining / (steps - i) * random.uniform(1.2, 1.8) elif i < steps * 0.9: step_distance = remaining / (steps - i) * random.uniform(0.8, 1.2) else: step_distance = remaining / (steps - i) * random.uniform(0.4, 0.8) step_distance = min(step_distance, remaining) current_x += int(step_distance) remaining -= step_distance current_y = random.randint(-1, 1) if i < steps * 0.7: time_interval = random.randint(50, 70) elif i < steps * 0.9: time_interval = random.randint(60, 80) else: time_interval = random.randint(70, 90) elapsed_time += time_interval track_list.append({ "x": current_x, "y": current_y, "type": "move", "t": elapsed_time }) if current_x != target_offset: current_x = target_offset elapsed_time += random.randint(50, 80) track_list.append({ "x": current_x, "y": current_y, "type": "move", "t": elapsed_time }) if elapsed_time < 1800: elapsed_time = random.randint(1800, 2000) else: elapsed_time += random.randint(50, 150) track_list.append({ "x": current_x, "y": current_y, "type": "up", "t": elapsed_time }) return track_list
def encode_password(password): """将密码转换为URL编码格式(ASCII码逗号分隔)""" encoded = ",".join([str(ord(c)) for c in password]) return encoded
def encode_username(username): """将用户名转换为URL编码格式(ASCII码逗号分隔)""" encoded = ",".join([str(ord(c)) for c in username]) return encoded
def login(): """执行登录流程""" global edu_token, edu_sign, edu_cid, user_account_id print("="*50) print("开始登录流程...") print("="*50) print("\n[1/4] 访问首页获取初始Cookie...") headers = create_base_headers() session.get(f"{BASE_URL}/", headers=headers) print("\n[2/4] 获取验证码...") verify_code = get_verify_code() if not verify_code: raise Exception("验证码获取失败") print("\n[3/4] 准备登录数据...") encoded_username = encode_username(USERNAME) encoded_password = encode_password(PASSWORD) login_data = { "UserName": encoded_username, "Password": encoded_password, "Code": verify_code } print(f"用户名(编码后): {encoded_username}") print(f"密码(编码后): {encoded_password}") print(f"验证码: {verify_code}") print("\n[4/4] 发送登录请求...") login_url = f"{BASE_URL}/login" login_headers = create_base_headers( content_type="application/x-www-form-urlencoded; charset=UTF-8", extra_headers={ "Host": "swfucce.sccchina.net", "Accept": "application/json, text/javascript, */*; q=0.01", "eduRefUrl": f"{BASE_URL}/", } ) response = session.post(login_url, headers=login_headers, data=login_data) print(f"\n登录响应状态码: {response.status_code}") print(f"登录响应内容: {response.text}") try: result = response.json() if result.get("code") == 1: print("\n✓ 登录成功!") if "data" in result and "eduCId" in result["data"]: edu_cid = result["data"]["eduCId"] print(f"eduCId: {edu_cid}") print("\n当前Cookie:") for cookie in session.cookies: print(f" {cookie.name}: {cookie.value}") update_dynamic_params() return True else: error_msg = result.get("message", "未知错误") print(f"\n✗ 登录失败: {error_msg}") return False except Exception as e: print(f"\n✗ 解析登录响应失败: {e}") return False
def update_dynamic_params(): """从 /student/ 页面更新动态参数 eduToken、eduCId、userId 等""" global edu_token, edu_sign, edu_cid, user_account_id print("\n正在从学生主页获取动态参数...") student_url = f"{BASE_URL}/student/" headers = create_base_headers( extra_headers={ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", } ) response = session.get(student_url, headers=headers) if response.status_code == 200: html_content = response.text with open("student_page_debug.html", "w", encoding="utf-8") as f: f.write(html_content) print("已保存页面HTML到 student_page_debug.html") import re token_patterns = [ r'"eduToken"\s*:\s*"([^"]+)"', r'eduToken\s*:\s*["\']([^"\']+)["\']', ] edu_token = None for pattern in token_patterns: token_match = re.search(pattern, html_content) if token_match: edu_token = token_match.group(1) break if edu_token: print(f"✓ eduToken: {edu_token}") else: print("✗ 未找到 eduToken") print("提示: 请检查 student_page_debug.html 文件确认页面结构") cid_patterns = [ r'cxt\.eduCId\s*=\s*["\']([^"\']+)["\']', r'"eduCId"\s*:\s*"([^"]+)"', ] edu_cid_local = None for pattern in cid_patterns: cid_match = re.search(pattern, html_content) if cid_match: edu_cid_local = cid_match.group(1) break if edu_cid_local: edu_cid = edu_cid_local print(f"✓ eduCId: {edu_cid}") else: print("✗ 未找到 eduCId") userid_patterns = [ r'"userId"\s*:\s*"([^"]+)"', r'userId\s*:\s*["\']([^"\']+)["\']', r'"userAccountId"\s*:\s*"([^"]+)"', ] user_account_id = None for pattern in userid_patterns: userid_match = re.search(pattern, html_content) if userid_match: user_account_id = userid_match.group(1) break if user_account_id: print(f"✓ userId (userAccountId): {user_account_id}") else: print("✗ 未找到 userId") print("提示: 请检查 student_page_debug.html 文件确认字段名") username_patterns = [ r'"userName"\s*:\s*"([^"]+)"', r'userName\s*:\s*["\']([^"\']+)["\']', ] for pattern in username_patterns: username_match = re.search(pattern, html_content) if username_match: print(f"✓ 用户名: {username_match.group(1)}") break station_patterns = [ r'"stationId"\s*:\s*"([^"]+)"', r'stationId\s*:\s*["\']([^"\']+)["\']', ] for pattern in station_patterns: station_match = re.search(pattern, html_content) if station_match: print(f"✓ stationId: {station_match.group(1)}") break sign_patterns = [ r'"?eduSign"?\s*[:=]\s*["\']([^"\']+)["\']', r'edusign\s*[:=]\s*["\']([^"\']+)["\']', ] for pattern in sign_patterns: sign_match = re.search(pattern, html_content, re.IGNORECASE) if sign_match: edu_sign = sign_match.group(1) print(f"✓ eduSign: {edu_sign}") break if not edu_sign: print("⚠ 未在页面中找到 eduSign") print("提示: eduSign 可能需要从其他接口获取或手动配置") if edu_token and edu_cid and user_account_id: print("\n✓ 所有动态参数获取成功!") return True else: print("\n✗ 部分参数获取失败,请检查页面结构是否变化") print(f" edu_token: {'✓' if edu_token else '✗'}") print(f" edu_cid: {'✓' if edu_cid else '✗'}") print(f" user_account_id: {'✓' if user_account_id else '✗'}") return False else: print(f"✗ 访问学生主页失败: {response.status_code}") print(f"响应内容: {response.text[:200]}") return False
def get_course_list(): """获取课程列表""" print("\n" + "="*50) print("获取课程列表...") print("="*50) if not all([edu_token, edu_cid, user_account_id]): print("错误: 动态参数未完全获取,请先确保登录成功") return [] url = f"{BASE_URL}/student/student/coursestudy/getlist" headers = create_api_headers( content_type="application/json", extra_headers={ "Host": "swfucce.sccchina.net", "metadataCode": "Student_StudentHome", "eduRefUrl": f"{BASE_URL}/student/", } ) data = json.dumps({"data": "aggregation"}, separators=(',', ':')) response = session.post(url, headers=headers, data=data) print(f"响应状态码: {response.status_code}") if response.status_code == 403: response_text = response.text print(f"收到403错误,响应内容: {response_text[:200]}") if '403.5' in response_text or 'robot' in response_text.lower(): print("\n检测到需要滑块验证(403.5)") print("开始处理滑块验证码...\n") captcha_id = handle_slider_captcha() if captcha_id: print("✓ 滑块验证成功,重新获取课程列表...\n") response = session.post(url, headers=headers, data=data) print(f"重试后响应状态码: {response.status_code}") else: print("✗ 滑块验证失败") return [] else: print("403错误,但不是滑块验证问题") return [] if response.status_code == 200: try: result = response.json() if "items" in result: courses = result["items"] print(f"\n共获取到 {len(courses)} 门课程") for i, course in enumerate(courses[:5], 1): print(f"\n[{i}] {course.get('versionName', 'N/A')}") print(f" courseVersionID: {course.get('courseVersionID', 'N/A')}") print(f" teachplanCourseVersionId: {course.get('teachplanCourseVersionId', 'N/A')}") print(f" sign: {course.get('sign', 'N/A')}") print(f" 学习进度: {course.get('coursewareLearningProgress', 'N/A')}") return courses else: print("未找到课程数据") return [] except Exception as e: print(f"解析课程列表失败: {e}") print(f"响应内容: {response.text[:500]}") return [] else: print(f"请求失败: {response.text}") return []
def send_study_duration_request(config): """发送学习时长记录请求""" headers = create_api_headers( content_type="application/json", extra_headers={ "Host": "swfucce.sccchina.net", "Accept": "application/json", "eduRefUrl": config["edurefurl"], } ) data_string = json.dumps({"data": config["data"]}, separators=(',', ':')) response = session.post( url=f"{BASE_URL}/student/student/coursestudyrecord/adddurationpc", headers=headers, data=data_string ) return response
def build_study_config_from_course(course): """从课程信息构建学习配置""" course_version_id = course.get('courseVersionID') teachplan_course_version_id = course.get('teachplanCourseVersionId') sign = course.get('sign') version_name = course.get('versionName', '') if not all([course_version_id, teachplan_course_version_id, sign]): print(f"警告: 课程 {version_name} 缺少必要参数") return None if not user_account_id: print("错误: user_account_id 未获取") return None from urllib.parse import quote edurefurl = (f"{BASE_URL}/student/videolearning.html#Subpage/StudentVersionVideo?" f"courseVersionId={course_version_id}&" f"teachplanCourseVersionId={teachplan_course_version_id}&" f"sign={sign}&" f"userAccountId={user_account_id}&" f"syncPracticePiwik=1&" f"piwikSiteId=750&" f"versionName={quote(version_name)}") import hashlib timestamp = str(int(time.time() * 1000)) token_str = f"{course_version_id}_{timestamp}_{user_account_id}" token_hash = hashlib.md5(token_str.encode()).hexdigest() token = f"{token_hash}|{timestamp}" config = { "edurefurl": edurefurl, "edutoken": edu_token or "placeholder", "edusign": "placeholder", "data": { "courseVersionId": str(course_version_id), "studyDuration": 30, "token": token, }, } return config
def main(): """主函数""" print("\n" + "#"*60) print("# 自动化学习系统") print("#"*60 + "\n") login_success = login() if not login_success: print("\n登录失败,程序退出") return courses = get_course_list() if not courses: print("\n未获取到课程列表,程序退出") return print("\n" + "="*50) print("开始模拟学习...") print("="*50) study_courses = courses[:4] while True: for course in study_courses: course_name = course.get('versionName', 'Unknown') print(f"\n>>> 正在学习: {course_name}") config = build_study_config_from_course(course) if config: try: response = send_study_duration_request(config) print(f" 状态码: {response.status_code}") print(f" 响应: {response.text}") except Exception as e: print(f" 请求失败: {e}") else: print(f" 跳过: 无法构建配置") time.sleep(2) print("\n" + "-"*50) print(f"本轮完成,等待30秒后继续...") print("-"*50) time.sleep(30)
if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n程序被用户中断") except Exception as e: print(f"\n程序异常: {e}") import traceback traceback.print_exc()
|