1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
| import os
import subprocess
from urllib.parse import urlparse
from pyrogram import Client, filters, types, enums
# Telegram bot token
API_ID = # 替换成您的API ID
API_HASH = '' # 替换成您的API Hash
BOT_TOKEN = '' # 替换成您的Bot Token
# 定义Pyrogram客户端
app = Client('my_bot', api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN)
# app = Client('my_bot')
AUTHORIZED_USER="" #替换为您的用户ID
MAX_LOG_LINES = 100 # 最大保留的日志行数
def log_output(output):
# 读取当前日志内容,最多保留 MAX_LOG_LINES 行
with open('download_log.txt', 'r') as log_file:
lines = log_file.readlines()
lines = lines[-MAX_LOG_LINES + 1:] # 保留最新的 MAX_LOG_LINES - 1 行
# 将新的日志追加到列表末尾
lines.append(output + '\n')
# 将新的日志写入文件
with open('download_log.txt', 'w') as log_file:
log_file.writelines(lines)
def private_use(func):
def wrapper(client: Client, message: types.Message):
chat_id = getattr(message.from_user, "id", None)
# message type check
if message.chat.type != enums.ChatType.PRIVATE and not message.text.lower().startswith("/ytdl"):
logging.debug("%s, it's annoying me...🙄️ ", message.text)
return
# authorized users check
if AUTHORIZED_USER:
users = [int(i) for i in AUTHORIZED_USER.split(",")]
else:
users = []
if users and chat_id and chat_id not in users:
message.reply_text("This is a private bot.", quote=True)
return
return func(client, message)
return wrapper
@app.on_message(filters.command(["start"]))
@private_use
def start_command(client, message):
message.reply_text("The bot is running.")
@app.on_message(filters.text & filters.private)
@private_use
def handle_text_message(client, message):
video_link = message.text.strip()
domain = urlparse(video_link).hostname
if domain in ('www.youtube.com', 'youtu.be', 'youtube.com'):
paths = '<your_cloud_path>/youtube'
cookies = None
elif domain in ('www.bilibili.com', 'b23.tv'):
paths = '<your_cloud_path>/bilibili'
cookies = '<your_cookies_path>'
else:
message.reply_text("Invalid video link. Please provide a link from YouTube or Bilibili.")
return
command = [
'yt-dlp',
'-f', 'bestvideo+bestaudio/best',
video_link,
'--paths', paths,
'--write-subs',
'--write-auto-subs',
'--sub-langs', 'zh.*',
'--merge-output-format', 'mkv',
'--external-downloader', 'aria2c',
'--external-downloader-args', '-x 5 -k 4M'
]
if cookies:
command.extend(['--cookies', cookies])
# 发送 "Downloading in progress." 消息
downloading_message = message.reply_text("Downloading in progress.", reply_to_message_id=message.id)
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while True:
output = process.stdout.readline().decode().strip()
if output == '' and process.poll() is not None:
break
log_output(output)
if "ERROR" in output:
downloading_message.delete()
message.reply_text("An error occurred during video download.", reply_to_message_id=message.id)
break
elif "has already been downloaded" in output:
downloading_message.delete()
message.reply_text("Video has already been downloaded.", reply_to_message_id=message.id)
break
elif "Deleting original file" in output:
downloading_message.delete()
message.reply_text("Video download complete.", reply_to_message_id=message.id)
break
elif "are missing" in output:
downloading_message.delete()
message.reply_text("Bilibili cookies expired, please update.", reply_to_message_id=message.id)
process.terminate() # 停止下载进程
break
if __name__ == '__main__':
app.run()
|