官方文档:https://docs.telethon.dev/
首先登录 https://my.telegram.org/auth,点击API development tools

App title和Short name随便写,然后获取api_id和api_hash即可
测试连接
from telethon import TelegramClient
import socks
#########登录信息############
api_id = 18215199 # your telegram api id
api_hash = '6c04b7383184262389c3ff169f0a47a3' # yo
name = '+85252663276'
client = TelegramClient(name, api_id, api_hash, proxy=(socks.SOCKS5, 'localhost', 10800))
#此处的some_name是一个随便起的名称,第一次运行会让你输入手机号和验证码,之后会生成一个some_name.session的文件,再次运行的时候就不需要反复输入手机号验证码了
async def main():
# Now you can use all client methods listed below, like for example...
await client.send_message('me', 'Hello to myself!')
with client:
client.loop.run_until_complete(main())
获取聊天信息
from telethon import TelegramClient, events, sync
import time
import socks
import os
import asyncio
import telethon
#########登录信息############
api_id = 18215199 # your telegram api id
api_hash = '6c04b7383184262389c3ff169f0a47a3' # yo
phone_number = '+85252663276' # 用户号码
client = TelegramClient(phone_number, api_id, api_hash, proxy=(socks.SOCKS5, 'localhost', 10800))
with client:
for dialog in client.iter_dialogs():
print(dialog.name)
并发下载某个频道的所有文件
# 开源地址:
# 文档:https://docs.telethon.dev/en/latest/
# 博客文档:https://www.jianshu.com/p/0f61dd28d969
from telethon import TelegramClient, events, sync
import time
import socks
import os
import asyncio
import telethon
#########登录信息############
api_id = 18215199 # your telegram api id
api_hash = '6c04b7383184262389c3ff169f0a47a3' # yo
phone_number = '+85252663276' # 用户号码
client = TelegramClient(phone_number, api_id, api_hash, proxy=(socks.SOCKS5, 'localhost', 10800))
channel_username = "免费社工库 拒绝付费"
dirs = channel_username # 数据保存的目录
async def download_msg(message: telethon.tl.patched.Message):
print(message.id, end="\t")
if message.media:
try:
filename = message.media.document.attributes[0].file_name
print(f"发现文件:{filename}")
filepath = os.path.join(dirs, filename)
if os.path.exists(filepath):
size = os.path.getsize(filepath)
if size == message.media.document.size:
print("该文件本地已经存在,跳过下载")
return
except:
return
print("开始下载")
path = await client.download_media(message, file=filepath, progress_callback=callback)
print('File saved to', path) # printed after download is done
else:
print("没发现媒体文件")
async def produce_msgs(queue=asyncio.Queue()):
async for msg in client.iter_messages(channel_username):
await queue.put(msg)
print(f'获取到一条消息{msg}')
async def consume_msgs(queue=asyncio.Queue()):
while True:
msg = await queue.get()
await download_msg(msg)
queue.task_done() # 每task_done一次 就从队列里删掉一个元素 是为了配合
async def main():
if not os.path.exists(dirs):
os.makedirs(dirs)
queue = asyncio.Queue() # 用来存放需要下载的文件消息
producer = produce_msgs(queue) # 一个生产者
consumer_list = [] # 五个消费者
for i in range(5):
consumer_list.append(asyncio.create_task(consume_msgs(queue)))
# asyncio.create_task vs loop.create_task vs asyncio.ensure_future
# 这三个都可以用来创建一个异步的任务
# 从Python3.7开始可以统一的使用更高阶的asyncio.create_task
# 等待生产者完成
await producer
# 等待消费者完成,消费者完成后queue为空,然后停止所有的消费者
await queue.join()
for consumer in consumer_list:
consumer.cancel()
print("结束")
def callback(current, total):
print("\r进度: {:.2f}%".format(current / total * 100), end='')
with client:
client.loop.run_until_complete(main())
批量加群
# 加群参考:https://docs.telethon.dev/en/latest/examples/chats-and-channels.html?highlight=JoinChannelRequest#joining-a-public-channel
from telethon import TelegramClient,errors
from telethon.tl.functions.channels import JoinChannelRequest
import socks
import json
import time
#########登录信息############
api_id = 18215199 # your telegram api id
api_hash = '6c04b7383184262389c3ff169f0a47a3' # yo
name = '+85252663276'
client = TelegramClient(name, api_id, api_hash, proxy=(socks.SOCKS5, 'localhost', 10800))
async def join_group(channel):
await client(JoinChannelRequest(channel=channel))
async def main():
with open('tg_group.json', 'r',encoding="utf-8") as f:
data = json.load(f)
for d in data:
channel = d["id"]
print(f"即将加入群{channel}")
try: # 加群有限制
await join_group(channel=channel)
except errors.FloodWaitError as e:
# e.seconds is how many seconds you have
# to wait before making the request again.
print('Flood for', e.seconds)
time.sleep(int(e.seconds)+5)
await join_group(channel=channel)
print(f"成功加入群{channel}")
with client:
client.loop.run_until_complete(main())