43 lines
1.4 KiB
Python
43 lines
1.4 KiB
Python
import tweepy
|
|
import time
|
|
|
|
# 替换为你的 Twitter API 密钥
|
|
API_KEY = 'PvmpsQ03urOzrZLxFRaTlq1e2'
|
|
API_KEY_SECRET = 'SY2xcGOJBVRO8pStF23Al5lHgXRJ95b9gzHuOdX6sI40D9IZ0Z'
|
|
ACCESS_TOKEN = '209453217-1TBd7HtWSrzVc24ZYVzb3JwbOtxPxGuqRwq4bXuW'
|
|
ACCESS_TOKEN_SECRET = 'oU39wnIOCwILHZ2RXbzHSGnS9e0SAsvtwF6PhyhDU0Hra'
|
|
BEARER_TOKEN = 'AAAAAAAAAAAAAAAAAAAAAN3VtQEAAAAAKSMZ3VcPmGVwMgxIjX8yeb9Fbow%3DXl2BW9tUi5vn1y6uFpcA8DsyXp56yd0MQnmGhheWK04tFKPxDf'
|
|
|
|
# 认证到 Twitter API
|
|
client = tweepy.Client(bearer_token=BEARER_TOKEN,
|
|
consumer_key=API_KEY,
|
|
consumer_secret=API_KEY_SECRET,
|
|
access_token=ACCESS_TOKEN,
|
|
access_token_secret=ACCESS_TOKEN_SECRET)
|
|
|
|
# 指定要监控的 Twitter 账号
|
|
twitter_username = 'whale_alert'
|
|
|
|
# 获取用户ID
|
|
user = client.get_user(username=twitter_username)
|
|
user_id = user.data.id
|
|
|
|
# 已处理推文的ID集合
|
|
processed_tweet_ids = set()
|
|
|
|
def monitor_tweets(user_id):
|
|
# 获取该用户最新的推文
|
|
response = client.get_users_tweets(id=user_id, max_results=5)
|
|
tweets = response.data
|
|
|
|
# 处理和打印新的推文
|
|
for tweet in tweets:
|
|
if tweet.id not in processed_tweet_ids:
|
|
print(f'{tweet.id} - {tweet.text}')
|
|
processed_tweet_ids.add(tweet.id)
|
|
|
|
# 定期监控推文(例如每隔 60 秒)
|
|
while True:
|
|
monitor_tweets(user_id)
|
|
time.sleep(60)
|