trading-quant/monitors/twitter.py
2024-07-24 14:29:25 +08:00

43 lines
1.4 KiB
Python

import tweepy
import time
# 替换为你的 Twitter API 密钥
API_KEY = 'PvmpsQ03urOzrZLxFRaTlq1e2'
API_KEY_SECRET = 'SY2xcGOJBVRO8pStF23Al5lHgXRJ95b9gzHuOdX6sI40D9IZ0Z'
ACCESS_TOKEN = '209453217-1TBd7HtWSrzVc24ZYVzb3JwbOtxPxGuqRwq4bXuW'
ACCESS_TOKEN_SECRET = 'oU39wnIOCwILHZ2RXbzHSGnS9e0SAsvtwF6PhyhDU0Hra'
BEARER_TOKEN = 'AAAAAAAAAAAAAAAAAAAAAN3VtQEAAAAAKSMZ3VcPmGVwMgxIjX8yeb9Fbow%3DXl2BW9tUi5vn1y6uFpcA8DsyXp56yd0MQnmGhheWK04tFKPxDf'
# 认证到 Twitter API
client = tweepy.Client(bearer_token=BEARER_TOKEN,
consumer_key=API_KEY,
consumer_secret=API_KEY_SECRET,
access_token=ACCESS_TOKEN,
access_token_secret=ACCESS_TOKEN_SECRET)
# 指定要监控的 Twitter 账号
twitter_username = 'whale_alert'
# 获取用户ID
user = client.get_user(username=twitter_username)
user_id = user.data.id
# 已处理推文的ID集合
processed_tweet_ids = set()
def monitor_tweets(user_id):
# 获取该用户最新的推文
response = client.get_users_tweets(id=user_id, max_results=5)
tweets = response.data
# 处理和打印新的推文
for tweet in tweets:
if tweet.id not in processed_tweet_ids:
print(f'{tweet.id} - {tweet.text}')
processed_tweet_ids.add(tweet.id)
# 定期监控推文(例如每隔 60 秒)
while True:
monitor_tweets(user_id)
time.sleep(60)