telethon手册

官方文档:https://docs.telethon.dev/

首先登录 https://my.telegram.org/auth,点击API development tools

2022-09-07T07:33:47.png

App title和Short name随便写,然后获取api_id和api_hash即可

测试连接


from telethon import TelegramClient
import socks
#########登录信息############
api_id = 18215199  # your telegram api id
api_hash = '6c04b7383184262389c3ff169f0a47a3'  # yo
name = '+85252663276'
client = TelegramClient(name, api_id, api_hash, proxy=(socks.SOCKS5, 'localhost', 10800))
#此处的some_name是一个随便起的名称,第一次运行会让你输入手机号和验证码,之后会生成一个some_name.session的文件,再次运行的时候就不需要反复输入手机号验证码了
async def main():
    # Now you can use all client methods listed below, like for example...
    await client.send_message('me', 'Hello to myself!')

with client:
    client.loop.run_until_complete(main())

获取聊天信息


from telethon import TelegramClient, events, sync
import time
import socks
import os
import asyncio
import telethon

#########登录信息############
api_id = 18215199  # your telegram api id
api_hash = '6c04b7383184262389c3ff169f0a47a3'  # yo
phone_number = '+85252663276'  # 用户号码
client = TelegramClient(phone_number, api_id, api_hash, proxy=(socks.SOCKS5, 'localhost', 10800))

with client:
    for dialog in client.iter_dialogs():
        print(dialog.name)

并发下载某个频道的所有文件

# 开源地址:
# 文档:https://docs.telethon.dev/en/latest/
# 博客文档:https://www.jianshu.com/p/0f61dd28d969
from telethon import TelegramClient, events, sync
import time
import socks
import os
import asyncio
import telethon

#########登录信息############
api_id = 18215199  # your telegram api id
api_hash = '6c04b7383184262389c3ff169f0a47a3'  # yo
phone_number = '+85252663276'  # 用户号码
client = TelegramClient(phone_number, api_id, api_hash, proxy=(socks.SOCKS5, 'localhost', 10800))
channel_username = "免费社工库 拒绝付费"
dirs = channel_username  # 数据保存的目录


async def download_msg(message: telethon.tl.patched.Message):
    print(message.id, end="\t")
    if message.media:
        try:
            filename = message.media.document.attributes[0].file_name
            print(f"发现文件:{filename}")
            filepath = os.path.join(dirs, filename)
            if os.path.exists(filepath):
                size = os.path.getsize(filepath)
                if size == message.media.document.size:
                    print("该文件本地已经存在,跳过下载")
                    return
        except:
            return
        print("开始下载")
        path = await client.download_media(message, file=filepath, progress_callback=callback)
        print('File saved to', path)  # printed after download is done
    else:
        print("没发现媒体文件")


async def produce_msgs(queue=asyncio.Queue()):
    async for msg in client.iter_messages(channel_username):
        await queue.put(msg)
        print(f'获取到一条消息{msg}')


async def consume_msgs(queue=asyncio.Queue()):
    while True:
        msg = await queue.get()
        await download_msg(msg)
        queue.task_done()  # 每task_done一次 就从队列里删掉一个元素 是为了配合


async def main():
    if not os.path.exists(dirs):
        os.makedirs(dirs)
    queue = asyncio.Queue()  # 用来存放需要下载的文件消息

    producer = produce_msgs(queue)  # 一个生产者
    consumer_list = []  # 五个消费者
    for i in range(5):
        consumer_list.append(asyncio.create_task(consume_msgs(queue)))
        # asyncio.create_task vs loop.create_task vs asyncio.ensure_future
        # 这三个都可以用来创建一个异步的任务
        # 从Python3.7开始可以统一的使用更高阶的asyncio.create_task

    # 等待生产者完成
    await producer

    # 等待消费者完成,消费者完成后queue为空,然后停止所有的消费者
    await queue.join()
    for consumer in consumer_list:
        consumer.cancel()

    print("结束")


def callback(current, total):
    print("\r进度: {:.2f}%".format(current / total * 100), end='')

with client:
    client.loop.run_until_complete(main())

批量加群

# 加群参考:https://docs.telethon.dev/en/latest/examples/chats-and-channels.html?highlight=JoinChannelRequest#joining-a-public-channel
from telethon import TelegramClient,errors
from telethon.tl.functions.channels import JoinChannelRequest
import socks
import json
import time

#########登录信息############
api_id = 18215199  # your telegram api id
api_hash = '6c04b7383184262389c3ff169f0a47a3'  # yo
name = '+85252663276'
client = TelegramClient(name, api_id, api_hash, proxy=(socks.SOCKS5, 'localhost', 10800))


async def join_group(channel):
    await client(JoinChannelRequest(channel=channel))


async def main():
    with open('tg_group.json', 'r',encoding="utf-8") as f:
        data = json.load(f)
        for d in data:
            channel = d["id"]
            print(f"即将加入群{channel}")
            try:  # 加群有限制
                await join_group(channel=channel)
            except errors.FloodWaitError as e:
                # e.seconds is how many seconds you have
                # to wait before making the request again.
                print('Flood for', e.seconds)
                time.sleep(int(e.seconds)+5)
                await join_group(channel=channel)
            print(f"成功加入群{channel}")


with client:
    client.loop.run_until_complete(main())