1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
| import subprocess
import os
import time
import threading
import re
from faster_whisper import WhisperModel
# 設定使用裝置
device = "cuda" if os.environ.get("CUDA_VISIBLE_DEVICES") != "-1" else "cpu"
print(f"目前使用裝置: {device}")
# 載入模型
model = WhisperModel("medium", device=device, compute_type="float16" if device == "cuda" else "int8")
# 合法化檔名
def sanitize_filename(name):
name = re.sub(r'[\\/:*?"<>|\n\r]', '_', name)
name = name.strip().rstrip('.')
return name[:100]
def get_playlist_title(playlist_url):
try:
result = subprocess.run([
"yt-dlp",
"--skip-download",
"--no-warning",
"--print", "playlist_title",
playlist_url
], capture_output=True, text=True)
lines = result.stdout.strip().splitlines()
if lines:
title = lines[0].strip()
else:
title = "playlist"
print(f"DEBUG 播放清單標題:{repr(title)}")
return title
except Exception as e:
print(f"⚠️ 無法取得播放清單標題:{e}")
return "playlist"
def is_playlist_url(url):
# yt-dlp 支援的判斷方法之一:用 --flat-playlist 抓 id,沒東西就是非播放清單
result = subprocess.run(["yt-dlp", "--flat-playlist", "--print", "id", url], capture_output=True, text=True)
lines = result.stdout.strip().split('\n')
# 如果有多筆影片id,代表是播放清單;只有一筆可能是影片或空
# 單一影片時會直接是影片id
# 判斷方法:如果能抓到多筆id且不只有一個,當作播放清單
# 如果沒抓到id,非播放清單
if len(lines) > 1:
return True
else:
return False
# 取得影片標題
def get_video_title(video_url):
try:
result = subprocess.run(["yt-dlp", "--get-title", video_url], capture_output=True, text=True)
return result.stdout.strip()
except Exception as e:
print(f"⚠️ 無法取得標題:{e}")
return "unknown_title"
# 取得影片秒數
def get_video_duration(video_url):
try:
result = subprocess.run(["yt-dlp", "--print", "duration", video_url], capture_output=True, text=True)
return int(result.stdout.strip())
except:
return 0
# 下載音訊
def download_audio(url, output_path):
os.makedirs(os.path.dirname(output_path), exist_ok=True)
print(f"下載音訊中:{url}")
result = subprocess.run([
"yt-dlp", "-x", "--audio-format", "mp3", "-o", output_path, url
], capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
if result.returncode != 0:
print(f"[錯誤] 下載失敗:{url}")
return False
return True
# 轉錄 + 進度顯示
def transcribe_with_progress(file_path, language='zh'):
stop_flag = threading.Event()
start_time = time.time()
def progress_timer():
while not stop_flag.is_set():
elapsed = time.time() - start_time
print(f"[進度] 轉錄中... 已進行 {int(elapsed)} 秒")
time.sleep(5)
print(f"開始轉錄:{file_path}")
timer_thread = threading.Thread(target=progress_timer)
timer_thread.start()
try:
segments_gen, info = model.transcribe(file_path, language=language)
segments = list(segments_gen)
print(f"轉錄完成!共取得 {len(segments)} 個語句段落")
except Exception as e:
print(f"[錯誤] 轉錄失敗:{e}")
segments, info = [], {}
finally:
stop_flag.set()
timer_thread.join()
return segments, info
# 寫入逐字稿
def transcribe_audio(file_path, output_path):
segments, info = transcribe_with_progress(file_path, language='zh')
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
for segment in segments:
f.write(f"[{round(segment.start, 2)}-{round(segment.end, 2)}] {segment.text}\n")
if segments:
total_audio_time = round(segments[-1].end, 2)
print(f"語音總長度(實際轉錄):{total_audio_time} 秒")
# 取得播放清單中的所有影片 ID
def get_playlist_video_ids(playlist_url):
result = subprocess.run(["yt-dlp", "--flat-playlist", "--print", "id", playlist_url], capture_output=True, text=True)
return result.stdout.strip().split('\n')
# 合併逐字稿
def merge_transcripts(transcript_dir, merged_filename="merged_transcript.txt"):
merged_path = os.path.join(transcript_dir, merged_filename)
with open(merged_path, "w", encoding="utf-8") as merged:
transcript_files = sorted(f for f in os.listdir(transcript_dir) if f.endswith(".txt") and f != merged_filename)
for fname in transcript_files:
merged.write(f"\n\n===== {fname} =====\n")
with open(os.path.join(transcript_dir, fname), "r", encoding="utf-8") as f:
merged.write(f.read())
print(f"✅ 合併逐字稿已完成,檔案路徑:{merged_path}")
# 主程式
os.makedirs("downloads", exist_ok=True)
os.makedirs("transcripts", exist_ok=True)
with open("video_playlist.txt", "r", encoding="utf-8") as f:
urls = [line.strip() for line in f if line.strip()]
for url in urls:
if is_playlist_url(url):
raw_title = get_playlist_title(url)
safe_title = sanitize_filename(raw_title)
print(f"\n📂 開始處理播放清單:{raw_title} ({url})")
playlist_download_dir = f"downloads/{safe_title}"
playlist_transcript_dir = f"transcripts/{safe_title}"
os.makedirs(playlist_download_dir, exist_ok=True)
os.makedirs(playlist_transcript_dir, exist_ok=True)
video_ids = get_playlist_video_ids(url)
for idx, vid in enumerate(video_ids, 1):
video_url = f"https://youtu.be/{vid}"
print(f"\n🎞️ ({idx}/{len(video_ids)}) 處理影片:{video_url}")
start_time = time.time()
title = get_video_title(video_url)
safe_video_title = sanitize_filename(title)
output_file = os.path.join(playlist_transcript_dir, f"{safe_video_title}.txt")
if os.path.exists(output_file):
print(f"⚠️ 逐字稿已存在,跳過影片:{safe_video_title}")
continue # 跳過下載及轉錄
duration = get_video_duration(video_url)
mp3_path = os.path.join(playlist_download_dir, f"{safe_video_title}.mp3")
if not os.path.exists(mp3_path):
if not download_audio(video_url, output_path=mp3_path):
print(f"跳過影片 {video_url},下載失敗")
continue
transcribe_audio(mp3_path, output_file)
elapsed = time.time() - start_time
speed_ratio = duration / elapsed if elapsed > 0 else 0
print(f"✅ 完成:{output_file}")
print(f"總耗時:{elapsed:.2f} 秒,影片長度:{duration} 秒,轉換速度比:約 {speed_ratio:.2f}x")
# 播放清單處理完後合併逐字稿
merge_transcripts(playlist_transcript_dir)
else:
# 單支影片處理
print(f"\n🎞️ 單支影片處理:{url}")
start_time = time.time()
title = get_video_title(url)
safe_video_title = sanitize_filename(title)
output_file = os.path.join("transcripts", f"{safe_video_title}.txt")
if os.path.exists(output_file):
print(f"⚠️ 逐字稿已存在,跳過影片:{safe_video_title}")
continue
duration = get_video_duration(url)
mp3_path = os.path.join("downloads", f"{safe_video_title}.mp3")
if not os.path.exists(mp3_path):
if not download_audio(url, output_path=mp3_path):
print(f"跳過影片 {url},下載失敗")
continue
transcribe_audio(mp3_path, output_file)
elapsed = time.time() - start_time
speed_ratio = duration / elapsed if elapsed > 0 else 0
print(f"✅ 完成:{output_file}")
print(f"總耗時:{elapsed:.2f} 秒,影片長度:{duration} 秒,轉換速度比:約 {speed_ratio:.2f}x")
|