はじめに
こちらは前回の記事(twicasのAPIを叩いてコメントを取得する – たこやきたべたい)の続きです。
基本なコメント取得の方法などは前回の記事を参考にしてください。
今回の記事ではリアルタイムでコメントを自動で取得していくPythonコードを紹介したいと思います。
過去コメントは取得できないのか
まず初めに、なぜわざわざリアルタイムで取得する必要があるのかといったことを説明したいと思います。
実際には配信終了時なり任意のタイミングでコメントを取得したほうが楽ではないかと思われるかもしれません。
しかし、現状そのようなことは不可能となっています。ネットで調べるとこのような(twitcaspy.API — Twitcasting API v2 Reference — twitcaspy 1.1.0 ドキュメント)moduleがでてきて、
API.get_comments(movie_id, *, offset=0, limit=20, slice_id=None)
指定した動画のコメントの一覧を作成日時の降順で取得します。
パラメータ
movie_id (str) -- ライブのID
offset(optional) (int) --
先頭からの位置
0以上の範囲で指定できます。(デフォルト: 0)
limit(optional) (int) --
最大取得件数
1から50の範囲で指定できます。(デフォルト: 10)
(場合により、指定件数に満たない数の動画を返す可能性があります。)
slice_id(optional) (int or None) --
このslice_id以前のコメントを取得します。
1以上の範囲で指定できます。
(デフォルトでは指定されていません。[= None])
このパラメータを指定した場合、offsetは無視されます。
2018-08-28 更新
slice_id に指定可能な最小値が 1 になりました。
戻り値
以下は Result の属性です。
latelimit : LateLimit
movie_id : Raw (int) ライブのID
all_count : Raw (int) 総コメント数
comments : Comment の list
以上のような関数があって実行してみました。ただ、現状(2025/2/24時点)では動画へのアクセスはできましたがcomments欄が空白となり、コメントを取得することは不可能でした。
リアルタイム取得するためのコード
まず、リアルタイムで放送中であるかを確認する必要があります。
以前の記事でも書きましたが、復習をすると
import requests
# TwitCasting APIキー
ACCESS_TOKEN = "ここにアクセストークンを入れる"
HEADERS = {
"Authorization": f"Bearer {ACCESS_TOKEN}",
"X-Api-Version": "2.0"
}
response = requests.get("https://apiv2.twitcasting.tv/users/[任意の実況者のID]/movies", headers=HEADERS)
print(response.json())
これで任意の実況者の動画情報を得ることができます。この時の出力は
{'movies': [{'id': 'ABCDE', 'user_id': 'xxx', 'title': 'xxx',
'subtitle': xxx, 'last_owner_comment': 'xxx', 'category': 'xxx',
'link': 'xxx', 'is_live': True, 'is_recorded': "bool",……………………
となっています。注目すべきは’is_live’がTrueかFalseかであり、このjsonファイルは時系列順にならんでいるので0番目のis_livedを判定することで生放送中かそうではないかといったことが判別できます。
import requests
# TwitCasting APIキー
ACCESS_TOKEN = "ここにアクセストークンを入れる"
if not API_KEY:
raise ValueError("TWITCAS_API_KEY environment variable not set")
# 監視対象のユーザーID
USER_ID = "[任意の実況者のID]" # 例:監視したいユーザーのID
def is_live(user_id):
"""
指定されたユーザーがライブ配信中かどうかを判定する
"""
url = f"https://apiv2.twitcasting.tv/users/{user_id}/movies"
headers = {"Authorization": f"Bearer {API_KEY}"}
response = requests.get(url, headers=headers)
data = response.json()
return data.get("movies") and data["movies"][0].get("is_live")
これで生配信を行っているかどうかの判別できる関数ができました。
後は簡単、以前紹介したコメントを取得するファイルを呼び出す仕組みにして、
メインのファイル
import subprocess
import requests
import time
# TwitCasting APIキー
ACCESS_TOKEN = "ここにアクセストークンを入れる"
if not API_KEY:
raise ValueError("TWITCAS_API_KEY environment variable not set")
# 監視対象のユーザーID
USER_ID = "[任意の実況者のID]" # 例:監視したいユーザーのID
def is_live(user_id):
"""
指定されたユーザーがライブ配信中かどうかを判定する
"""
url = f"https://apiv2.twitcasting.tv/users/{user_id}/movies"
headers = {"Authorization": f"Bearer {API_KEY}"}
response = requests.get(url, headers=headers)
data = response.json()
return data.get("movies") and data["movies"][0].get("is_live")
def execute_script(script_path):
"""
指定されたスクリプトを実行する
"""
try:
subprocess.Popen(["python", script_path]) # 別プロセスで実行
print(f"Script '{script_path}' executed.")
except FileNotFoundError:
print(f"Error: Script '{script_path}' not found.")
except Exception as e:
print(f"Error executing script: {e}")
if __name__ == "__main__":
script_path = "anlysis.py" # 実行するスクリプトのパス
print("監視を開始します...")
while True:
if is_live(USER_ID):
print("ライブ配信が開始されました!")
execute_script(script_path)
break # スクリプト実行後、監視を終了する場合
# 必要であれば、breakを削除して監視を継続
else:
print("ライブ配信はまだ開始されていません...")
time.sleep(60) # 1分ごとに監視
呼び出されるファイル
anlysis.py
import os
import requests
import csv
import time
import datetime
import re
tokyo_tz = datetime.timezone(datetime.timedelta(hours=9))
dt = datetime.datetime.now(tokyo_tz)
# TwitCasting APIキー
ACCESS_TOKEN = "ここにアクセストークンを入れる"
HEADERS = {
"Authorization": f"Bearer {ACCESS_TOKEN}",
"X-Api-Version": "2.0"
}
USER_ID = "[任意の実況者のID]" # 例:監視したいユーザーのID
response = requests.get(f"https://apiv2.twitcasting.tv/users/{USER_ID}/movies", headers=HEADERS)
data = response.json()
MOVIE_ID = data["movies"][0]["id"] # 現在配信中のライブID
LIVE_TITLE = data["movies"][0]["title"] # 配信中のライブ名
CSV_FILE = f"{str(dt.year)}{str(dt.month)}{str(dt.day)}_{LIVE_TITLE}.csv" # csvファイル名生成
CSV_FILE = re.sub(r'[\\|/|:|?|.|"|<|>|\|]', '-', CSV_FILE) # ファイル名に使えないものは置換
# CSVファイルのヘッダーを書き込む(最初の1回だけ)
with open(CSV_FILE, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(["id", "name", "screen_id", "message", "image", "profile"]) # ヘッダー行を変更
# 既に取得したコメントのIDを記録するセット(重複防止)
comment_ids = set()
def is_live(movie_id):
"""ライブ配信の状態を確認し、is_liveがFalseなら終了する"""
try:
response = requests.get(f"https://apiv2.twitcasting.tv/movies/{movie_id}", headers=HEADERS)
data = response.json()
return data.get("movie", {}).get("is_live", False)
except Exception as e:
print(f"ライブ状態の取得に失敗しました: {e}")
return True # 取得エラー時はとりあえず継続する
while is_live(MOVIE_ID):
try:
response = requests.get(f"https://apiv2.twitcasting.tv/movies/{MOVIE_ID}/comments", headers=HEADERS)
data = response.json()
with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
for comment in data["comments"]:
if comment["id"] not in comment_ids: # すでに記録されたコメントはスキップ
user = comment["from_user"]
writer.writerow([
comment["id"], # コメントID
user["name"], # ユーザー名
user["id"], # screen_id(ユーザーID)
comment["message"], # コメント内容
user.get("image", ""), # プロフィール画像URL(なければ空白)
user.get("profile", "") # プロフィール情報(なければ空白)
])
comment_ids.add(comment["id"]) # 記録済みIDを保存
print("新しいコメントを追加しました。")
time.sleep(5) # 5秒待機してから再度取得
except Exception as e:
print(f"エラーが発生しました: {e}")
time.sleep(10) # エラー時は10秒待機してリトライ
print("ライブ配信が終了しました。処理を停止します。")
コメント