KuroCore/kurocore/main/message.py
2024-01-02 00:25:23 +03:00

332 lines
9.8 KiB
Python

import json
from enum import Enum
from typing import Union, Type, Tuple
from kurocore.utils.vk.keyboard import VkKeyboard
from kurocore.utils.vk.vk import VKApiException
from kurocore.utils.vk_utils import generate_random_id
class MessageArgs(dict):
def __init__(self, args: dict):
self.__args: dict = args
super().__init__(args)
def __getattr__(self, item):
try:
return self.__args[item]
except KeyError:
return None
class VkObject:
__slots__ = (
'raw',
)
def __init__(self, raw):
self.raw = raw
class PhotoSize:
__slots__ = (
'type', 'url', 'width', 'height'
)
def __init__(self, raw: dict):
self.type: str = raw['type']
self.url: str = raw['url']
self.width: int = raw['width']
self.height: int = raw['height']
class StickerSize:
__slots__ = (
'url', 'width', 'height'
)
def __init__(self, raw: dict):
self.url: str = raw['url']
self.width: int = raw['width']
self.height: int = raw['height']
class Photo(VkObject):
__slots__ = (
'id', 'album_id', 'owner_id', 'access_key',
'user_id', 'text', 'date', 'sizes',
'type'
)
def __init__(self, raw: dict):
super().__init__(raw)
self.type = 'photo'
self.raw: dict = raw['photo']
self.id: int = self.raw['id']
self.album_id: int = self.raw['album_id']
self.owner_id: int = self.raw['owner_id']
self.access_key: str = self.raw.get('access_key', '')
self.user_id: int = self.raw.get('user_id', 0)
self.text: str = self.raw['text']
self.date: int = self.raw['date'] # unix time
self.sizes = [PhotoSize(r) for r in self.raw['sizes']]
def __repr__(self):
return f'photo{self.owner_id}_{self.id}'
class Sticker(VkObject):
__slots__ = (
'type', 'images', 'images_with_background',
'animation_url', 'is_allowed'
)
def __init__(self, raw: dict):
super().__init__(raw)
self.type = 'sticker'
self.raw: dict = raw['sticker']
self.images = [StickerSize(img) for img in self.raw['images']]
self.images_with_background = [StickerSize(img) for img in self.raw['images_with_background']]
self.animation_url = self.raw.get('animation_url', '')
self.is_allowed = self.raw.get('is_allowed')
def __repr__(self):
return f'sticker{self}'
class DocumentType(Enum):
TEXT = 1
ARCHIVE = 2
GIF = 3
PHOTO = 4
AUDIO = 5
VIDEO = 6
BOOKS = 7
UNKNOWN = 8
class Document(VkObject):
__slots__ = (
'id', 'album_id', 'owner_id',
'title', 'size', 'ext', 'url',
'date', 'type'
)
def __init__(self, raw: dict):
super().__init__(raw)
self.raw: dict = raw['doc']
self.id: int = self.raw['id']
self.owner_id: int = self.raw['owner_id']
self.title: str = self.raw['title']
self.size: int = self.raw['size'] # размер в байтах
self.ext: str = self.raw['ext'] # расширение
self.url: str = self.raw['url']
self.date: int = self.raw['date'] # unix time
self.type: DocumentType = self.raw['type']
def __repr__(self):
return f'doc{self.owner_id}_{self.id}'
Attachment = Type[Photo], Type[Document], Type[Sticker]
def load_attachments(raw: list[dict]) -> list[Attachment]:
attachments = []
for attachment in raw:
match attachment['type']:
case 'photo':
attachments.append(Photo(attachment))
case 'doc':
attachments.append(Document(attachment))
case 'sticker':
attachments.append(Sticker(attachment))
case _:
attachments.append(VkObject(attachment))
return attachments
def dump_attachments(raw_attachments: list[Attachment]) -> str:
return ','.join(map(repr, raw_attachments))
class EventActions:
SHOW_SNACKBAR = 'show_snackbar'
OPEN_LINK = 'open_link'
OPEN_APP = 'open_app'
class MessageID:
def __init__(self, body):
if type(body) is dict:
self.message_id = body.get('message_id', 0)
self.cmid = body.get('cmid', 0)
else:
self.message_id = body
self.cmid = body
class Message(VkObject):
__slots__ = (
'vk', 'api', 'raw', 'id', 'conversation_message_id', 'cmid',
'date', 'peer_id', 'from_id', 'user_id', 'chat_id', 'is_chat',
'original_text', 'text', 'attachments', 'payload',
'event_id', 'forwarded_messages', 'reply_message',
'meta'
)
def __init__(self, vk, api, raw):
super().__init__(raw)
self.vk = vk
self.api = api
if type(raw) is Message:
self.raw = raw.raw
else:
self.raw = raw.get('message', raw)
self.id: int = self.raw.get('id', 0)
self.conversation_message_id: int = self.raw.get('conversation_message_id', 0)
self.cmid: int = self.conversation_message_id
self.date: int = self.raw.get('date', 0)
self.peer_id: int = self.raw.get('peer_id', 0)
self.from_id: int = self.raw.get('from_id', 0)
self.user_id: int = self.raw.get('user_id', self.from_id)
self.chat_id: int = self.peer_id - 2000000000
self.is_chat: bool = self.chat_id > 0
self.original_text: str = self.raw.get('text', '')
self.text: str = self.original_text.lower()
self.attachments: list[Attachment] = load_attachments(self.raw.get('attachments', []))
raw_payload = self.raw.get('payload', '{}')
if type(raw_payload) is dict:
self.payload: dict = raw_payload
else:
self.payload: dict = json.loads(raw_payload)
self.event_id: str = self.raw.get('event_id', '')
self.forwarded_messages: list = self.raw.get('fwd_messages', [])
self.reply_message = Message(self.vk, self.api, self.raw['reply_message']) \
if 'reply_message' in self.raw else None
self.meta: dict = {}
async def send(self, peer_id, text: str = '', attachments: (Tuple[Attachment], Attachment, str) = '',
keyboard: Union[VkKeyboard, dict] = None, **kwargs):
data = kwargs.copy()
data.update({
'peer_id': peer_id,
'random_id': generate_random_id()
})
if text:
data.update({'message': text})
if keyboard:
if type(keyboard) is VkKeyboard:
data.update({'keyboard': keyboard.get_keyboard()})
else:
data.update({'keyboard': keyboard})
if attachments:
if type(attachments) is str:
data.update({'attachment': attachments})
elif type(attachments) in (Photo, Document):
data.update({'attachment': str(attachments)})
else:
data.update({'attachment': dump_attachments(attachments)})
return MessageID(await self.api.messages.send(**data))
async def answer_event(self, event_data):
data = {
'peer_id': self.peer_id,
'event_id': self.event_id,
'user_id': self.user_id,
'event_data': json.dumps(event_data),
}
try:
return await self.api.messages.sendMessageEventAnswer(**data)
except VKApiException:
...
async def answer_event_hide_keyboard(self, event_data):
await self.answer_event(event_data)
await self.api.messages.edit(
peer_id=self.peer_id,
conversation_message_id=self.conversation_message_id,
keyboard=VkKeyboard.get_empty_keyboard()
)
async def answer(self, text: str = '', attachments: (list[Attachment], Attachment, str) = '', **kwargs):
return await self.send(self.peer_id, text, attachments, **kwargs)
async def edit(self, text='', attachments: list[Attachment] = '', keyboard: Union[VkKeyboard, dict] = None,
**kwargs):
data: dict = kwargs.copy()
data.update({
'peer_id': self.peer_id
})
if text:
data.update({'message': text})
if keyboard:
if type(keyboard) is VkKeyboard:
data.update({'keyboard': keyboard.get_keyboard()})
else:
data.update({'keyboard': keyboard})
if attachments:
if type(attachments) is str:
data.update({'attachment': attachments})
elif type(attachments) in (Photo, Document):
data.update({'attachment': str(attachments)})
elif type(attachments) is list:
if type(attachments[0]) is dict:
data.update({'attachment': dump_attachments(load_attachments(attachments))})
else:
data.update({'attachment': dump_attachments(attachments)})
if 'cmid' not in kwargs and 'conversation_message_id' not in kwargs:
data.update({
'conversation_message_id': self.conversation_message_id
})
if 'cmid' in kwargs:
data.update({
'conversation_message_id': kwargs['cmid']
})
kwargs.pop('cmid')
try:
message_id = await self.api.messages.edit(**data)
except VKApiException:
message_id = await self.api.messages.send(**data, random_id=generate_random_id())
return MessageID(message_id)
async def get_by_cmid(self, cmid: int):
data = {
'peer_id': self.peer_id,
'conversation_message_ids': [cmid]
}
res = await self.api.messages.getByConversationMessageId(**data)
return Message(self.vk, self.api, res['items'][0])
def __repr__(self):
return str(self.raw)