利用青龙面板自动从Telegram频道下载文件并筛选输出IP库
前言
接上篇教程《自动从Telegram频道下载CSV文件并提取443端口IP地址》,因为在github actions自动运行,所以Telegram每次ip都会变动提醒,今天有时间把脚本修改了一下兼容青龙面板运行。
运行效果

部署方法
1、 进青龙面板,依赖管理里面安装pandas和telethon。如下图:

2、 进环境变量里面添加变信息信息。变量名如下:
| 变量名 | 说明 | 示例 |
|---|---|---|
TELEGRAM_API_ID |
Telegram API ID | 1234567 |
TELEGRAM_API_HASH |
Telegram API Hash | abcdef1234567890 |
TELEGRAM_PHONE |
手机号码(国际格式) | +861234567890 |
TELEGRAM_CHANNEL |
目标频道用户名 | @your_channel |

3、 进脚本管理里面新建一个名为telegram_ip_extractor.py的脚本。把最下面的脚本粘贴进去。如下图:

4、 点左侧的定时任务,创建任务。任务名称随意,命令脚本task telegram_ip_extractor.py,定时规则自己定义。

5、 手动运行一次没有报错,就可以进青龙目录下查看输出的txt文件,我这里是在群晖上搭建的Docker。所以直接进群晖查看,然后共享文件供openwrt路由优选调用。

6、 最后把共享链接导入到优选脚本里面使用即可。

写在最后
第一次建议本地运行后输出telegram_session,输出文件在temp目录下。然后再导入到青龙,。
*附上本地运行获取telegram_session的脚本。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Author: Telegram 本地设置脚本
# @Date: 2024-01-01
import os
import asyncio
from telethon import TelegramClient
import tempfile
import sys
def setup_telegram():
print("=== Telegram 本地设置脚本 ===")
print("此脚本将在本地完成Telegram验证并生成session文件")
print("完成后可将session文件用于青龙面板\n")
# 获取用户输入
api_id = input("请输入 TELEGRAM_API_ID: ").strip()
api_hash = input("请输入 TELEGRAM_API_HASH: ").strip()
phone_number = input("请输入 TELEGRAM_PHONE (国际格式,如 +861234567890): ").strip()
if not all([api_id, api_hash, phone_number]):
print("错误: 所有字段都必须填写!")
return
print(f"\n配置信息:")
print(f"API ID: {api_id}")
print(f"API Hash: {api_hash}")
print(f"手机号: {phone_number}")
print("\n正在启动验证流程...")
async def authenticate():
# 使用临时目录存储session文件
temp_dir = tempfile.gettempdir()
session_file = os.path.join(temp_dir, 'telegram_session')
print(f"Session文件将保存在: {session_file}")
client = TelegramClient(session_file, int(api_id), api_hash)
try:
await client.start(phone=phone_number)
print("✅ 验证成功! Session文件已生成")
print(f"✅ Session文件位置: {session_file}")
# 测试连接
me = await client.get_me()
print(f"✅ 登录账号: {me.first_name} ({me.phone})")
print("\n🎉 设置完成! 现在您可以在青龙面板中使用以下环境变量:")
print(f"TELEGRAM_API_ID={api_id}")
print(f"TELEGRAM_API_HASH={api_hash}")
print(f"TELEGRAM_PHONE={phone_number}")
print("\n💡 提示: Session文件会自动用于后续登录,无需再次验证")
except Exception as e:
print(f"❌ 验证失败: {e}")
finally:
await client.disconnect()
# 运行异步函数
asyncio.run(authenticate())
if __name__ == "__main__":
setup_telegram()
最后附上青龙脚本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Author: Telegram IP提取脚本 - 青龙面板修复版
# @Last Modified: 2025-10-18
import os
import re
import asyncio
from telethon import TelegramClient
import logging
import csv
import sys
from datetime import datetime, timedelta, timezone
print("======================================")
print("Telegram IP提取脚本 - 青龙面板修复版")
print("开始执行时间:", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
print("======================================")
# 青龙面板环境变量配置
API_ID = os.getenv('TELEGRAM_API_ID')
API_HASH = os.getenv('TELEGRAM_API_HASH')
PHONE_NUMBER = os.getenv('TELEGRAM_PHONE')
CHANNEL_USERNAME = os.getenv('TELEGRAM_CHANNEL')
# 自动检测青龙面板路径
def get_ql_paths():
"""自动检测青龙面板路径"""
possible_paths = [
'/ql/scripts/', # 标准路径
'/ql/data/scripts/', # 新版路径
'/ql/config/scripts/', # 配置路径
'./', # 当前目录
]
for base_path in possible_paths:
if os.path.exists(base_path):
return base_path
return './' # 默认当前目录
# 获取基础路径
BASE_PATH = get_ql_paths()
print(f"📁 检测到基础路径: {BASE_PATH}")
# 文件路径配置
DOWNLOAD_FOLDER = os.path.join(BASE_PATH, 'tg')
IP_FILE = os.path.join(BASE_PATH, 'tg/ip.txt')
HK_IP_FILE = os.path.join(BASE_PATH, 'tg/hkip.txt')
SG_IP_FILE = os.path.join(BASE_PATH, 'tg/sgip.txt')
# Session文件路径 - 尝试多个可能的位置
SESSION_PATHS = [
os.path.join(BASE_PATH, 'telegram_session'),
os.path.join(BASE_PATH, 'telegram_session.session'),
'/ql/scripts/telegram_session',
'/ql/data/scripts/telegram_session',
'/ql/scripts/telegram_session.session',
'/ql/data/scripts/telegram_session.session',
'./telegram_session',
'./telegram_session.session'
]
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
def find_session_file():
"""查找session文件"""
for session_path in SESSION_PATHS:
if os.path.exists(session_path):
logger.info(f"✅ 找到session文件: {session_path}")
return session_path
# 如果没有找到,显示所有可能的路径
logger.error("❌ 未找到session文件,请检查以下可能的位置:")
for path in SESSION_PATHS:
logger.error(f" - {path}")
return None
def check_environment():
"""检查环境变量配置"""
missing_vars = []
if not API_ID:
missing_vars.append('TELEGRAM_API_ID')
if not API_HASH:
missing_vars.append('TELEGRAM_API_HASH')
if not PHONE_NUMBER:
missing_vars.append('TELEGRAM_PHONE')
if not CHANNEL_USERNAME:
missing_vars.append('TELEGRAM_CHANNEL')
if missing_vars:
logger.error(f"❌ 缺少环境变量: {', '.join(missing_vars)}")
return False
# 查找session文件
session_file = find_session_file()
if not session_file:
return False
logger.info("✅ 环境检查通过")
return True
class TelegramDownloader:
def __init__(self, api_id, api_hash, phone_number, channel_username):
self.session_file = find_session_file()
if not self.session_file:
raise ValueError("未找到session文件")
self.client = TelegramClient(self.session_file, api_id, api_hash)
self.phone_number = phone_number
self.channel_username = channel_username
logger.info(f"📁 使用session文件: {self.session_file}")
async def start(self):
"""启动客户端"""
try:
await self.client.start(phone=self.phone_number)
logger.info("✅ Telegram客户端启动成功")
return True
except Exception as e:
logger.error(f"❌ 启动失败: {e}")
return False
async def download_recent_csv_files(self, download_folder):
"""下载最近发布的CSV文件"""
try:
os.makedirs(download_folder, exist_ok=True)
logger.info(f"📡 正在连接频道: {self.channel_username}")
channel = await self.client.get_entity(self.channel_username)
logger.info(f"✅ 成功连接到频道: {channel.title}")
except Exception as e:
logger.error(f"❌ 连接频道失败: {e}")
return []
# 获取最近3天的文件
utc_now = datetime.now(timezone.utc)
three_days_ago = utc_now - timedelta(days=3)
logger.info(f"🔍 正在查找最近3天发布的CSV文件...")
downloaded_files = []
try:
async for message in self.client.iter_messages(channel, limit=200):
if message.media and hasattr(message.media, 'document'):
document = message.media.document
filename = None
for attr in document.attributes:
if hasattr(attr, 'file_name'):
filename = attr.file_name
break
if filename and filename.lower().endswith('.csv'):
message_date = message.date
if message_date >= three_days_ago:
logger.info(f"📄 找到CSV文件: {filename}")
file_path = os.path.join(download_folder, filename)
# 无论文件是否存在都重新下载,确保获取最新版本
try:
await self.client.download_media(message, file=file_path)
logger.info(f"✅ 下载成功: {filename}")
downloaded_files.append(file_path)
except Exception as e:
logger.error(f"❌ 下载失败 {filename}: {e}")
logger.info(f"📊 总共找到 {len(downloaded_files)} 个CSV文件")
return downloaded_files
except Exception as e:
logger.error(f"❌ 获取消息时出错: {e}")
return []
def find_region_preferred_files(self, csv_files):
"""查找优先处理的区域文件"""
hk_files = []
sg_files = []
other_files = []
for file_path in csv_files:
filename = os.path.basename(file_path)
if re.match(r'^IataHK\.csv-.*-IP\.csv$', filename):
hk_files.append(file_path)
logger.info(f"🇭🇰 找到HK优选文件: {filename}")
elif re.match(r'^IataSG\.csv-.*-IP\.csv$', filename):
sg_files.append(file_path)
logger.info(f"🇸🇬 找到SG优选文件: {filename}")
else:
other_files.append(file_path)
logger.info(f"📁 文件统计: HK={len(hk_files)}, SG={len(sg_files)}, 其他={len(other_files)}")
return hk_files, sg_files, other_files
def extract_443_ips_from_csv(self, csv_file_path):
"""从CSV文件中提取443端口IP"""
if not os.path.exists(csv_file_path):
return []
ip_addresses = set()
try:
with open(csv_file_path, 'r', encoding='utf-8', errors='ignore') as file:
sample = file.read(1024)
file.seek(0)
delimiter = ','
if ';' in sample and ',' not in sample:
delimiter = ';'
elif '\t' in sample:
delimiter = '\t'
reader = csv.reader(file, delimiter=delimiter)
headers = None
for row_num, row in enumerate(reader, 1):
if not row:
continue
if row_num == 1:
headers = [header.strip().lower() for header in row]
continue
# 查找端口列
port_column_index = None
for i, header in enumerate(headers):
if header in ['port', '端口', 'port_number', '端口号', 'dstport', 'portid']:
port_column_index = i
break
if port_column_index is None:
for i, header in enumerate(headers):
if 'port' in header:
port_column_index = i
break
# 检查是否为443端口
is_443_port = False
if port_column_index is not None and port_column_index < len(row):
port_value = str(row[port_column_index]).strip()
if port_value == '443':
is_443_port = True
if is_443_port:
# 查找IP列
ip_column_index = None
for i, header in enumerate(headers):
if header in ['ip', 'ip地址', 'ip_address', 'address', '地址', 'dstip', 'ipaddr']:
ip_column_index = i
break
if ip_column_index is None:
ip_column_index = 0
if ip_column_index < len(row):
ip_value = str(row[ip_column_index]).strip()
ip_match = re.search(r'\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b', ip_value)
if ip_match:
ip = ip_match.group()
if self.is_valid_ip(ip):
ip_addresses.add(ip)
logger.info(f"✅ 从 {os.path.basename(csv_file_path)} 提取到 {len(ip_addresses)} 个443端口IP")
except Exception as e:
logger.error(f"❌ 读取CSV文件时出错: {e}")
return list(ip_addresses)
def is_valid_ip(self, ip):
"""验证IP地址格式"""
try:
parts = ip.split('.')
if len(parts) != 4:
return False
return all(0 <= int(part) <= 255 for part in parts if part.isdigit())
except:
return False
def save_ips_to_file(self, ip_list, output_file):
"""保存IP地址到文件 - 始终覆盖输出"""
try:
# 确保输出目录存在
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# 无论IP列表是否为空,都创建或覆盖文件
with open(output_file, 'w', encoding='utf-8') as f:
for ip in sorted(ip_list):
f.write(ip + '\n')
# 记录文件状态
if len(ip_list) > 0:
logger.info(f"💾 保存 {len(ip_list)} 个IP到 {output_file}")
else:
logger.info(f"📝 创建空文件 {output_file} (无IP数据)")
return True
except Exception as e:
logger.error(f"❌ 保存文件失败: {e}")
return False
async def close(self):
"""关闭客户端"""
await self.client.disconnect()
async def main():
# 检查环境
if not check_environment():
print("## 错误: 环境配置检查失败")
return
logger.info(f"🎯 目标频道: {CHANNEL_USERNAME}")
try:
# 初始化下载器
downloader = TelegramDownloader(API_ID, API_HASH, PHONE_NUMBER, CHANNEL_USERNAME)
# 启动客户端
logger.info("🚀 正在启动Telegram客户端...")
success = await downloader.start()
if not success:
logger.error("❌ 无法启动Telegram客户端")
return
# 下载CSV文件
csv_files = await downloader.download_recent_csv_files(DOWNLOAD_FOLDER)
# 无论是否找到CSV文件,都创建输出文件
all_ips = set()
hk_ips = set()
sg_ips = set()
if csv_files:
# 分离区域文件
hk_files, sg_files, other_files = downloader.find_region_preferred_files(csv_files)
# 提取所有443端口IP
for file_path in csv_files:
ips = downloader.extract_443_ips_from_csv(file_path)
all_ips.update(ips)
# 提取HK IP
for file_path in hk_files:
ips = downloader.extract_443_ips_from_csv(file_path)
hk_ips.update(ips)
# 提取SG IP
for file_path in sg_files:
ips = downloader.extract_443_ips_from_csv(file_path)
sg_ips.update(ips)
else:
logger.info("ℹ️ 未找到CSV文件,将创建空的输出文件")
# 始终保存文件(覆盖更新)
downloader.save_ips_to_file(list(all_ips), IP_FILE)
downloader.save_ips_to_file(list(hk_ips), HK_IP_FILE)
downloader.save_ips_to_file(list(sg_ips), SG_IP_FILE)
# 输出结果
print(f"\n## 任务执行完成")
print(f"## 提取结果汇总")
print(f"- 目标频道: {CHANNEL_USERNAME}")
print(f"- 处理文件数: {len(csv_files)}")
print(f"- 总443端口IP: {len(all_ips)} 个")
print(f"- HK区域443端口IP: {len(hk_ips)} 个")
print(f"- SG区域443端口IP: {len(sg_ips)} 个")
print(f"- 输出文件: {IP_FILE}, {HK_IP_FILE}, {SG_IP_FILE}")
except Exception as e:
logger.error(f"❌ 初始化失败: {e}")
finally:
try:
await downloader.close()
logger.info("👋 Telegram客户端已关闭")
except:
pass
if __name__ == "__main__":
asyncio.run(main())
print("======================================")
print("脚本执行完毕")
print("======================================")
