import json from enum import Enum from typing import Union, Type, Tuple from kurocore.database import Database from kurocore.vk import VkKeyboard, VKApiException, VK from kurocore.vk.utils import generate_random_id class MessageArgs(dict): def __init__(self, args: dict): self.__args: dict = args super().__init__(args) def __getattr__(self, item): try: return self.__args[item] except KeyError: return None class VkObject: __slots__ = ( 'raw', ) def __init__(self, raw): self.raw = raw class PhotoSize: __slots__ = ( 'type', 'url', 'width', 'height' ) def __init__(self, raw: dict): self.type: str = raw['type'] self.url: str = raw['url'] self.width: int = raw['width'] self.height: int = raw['height'] class StickerSize: __slots__ = ( 'url', 'width', 'height' ) def __init__(self, raw: dict): self.url: str = raw['url'] self.width: int = raw['width'] self.height: int = raw['height'] class Photo(VkObject): __slots__ = ( 'id', 'album_id', 'owner_id', 'access_key', 'user_id', 'text', 'date', 'sizes', 'type' ) def __init__(self, raw: dict): super().__init__(raw) self.type = 'photo' self.raw: dict = raw['photo'] self.id: int = self.raw['id'] self.album_id: int = self.raw['album_id'] self.owner_id: int = self.raw['owner_id'] self.access_key: str = self.raw.get('access_key', '') self.user_id: int = self.raw.get('user_id', 0) self.text: str = self.raw['text'] self.date: int = self.raw['date'] # unix time self.sizes = [PhotoSize(r) for r in self.raw['sizes']] def __repr__(self): return f'photo{self.owner_id}_{self.id}' class Sticker(VkObject): __slots__ = ( 'type', 'images', 'images_with_background', 'animation_url', 'is_allowed' ) def __init__(self, raw: dict): super().__init__(raw) self.type = 'sticker' self.raw: dict = raw['sticker'] self.images = [StickerSize(img) for img in self.raw['images']] self.images_with_background = [StickerSize(img) for img in self.raw['images_with_background']] self.animation_url = self.raw.get('animation_url', '') self.is_allowed = self.raw.get('is_allowed') def __repr__(self): return f'sticker{self}' class DocumentType(Enum): TEXT = 1 ARCHIVE = 2 GIF = 3 PHOTO = 4 AUDIO = 5 VIDEO = 6 BOOKS = 7 UNKNOWN = 8 class Document(VkObject): __slots__ = ( 'id', 'album_id', 'owner_id', 'title', 'size', 'ext', 'url', 'date', 'type' ) def __init__(self, raw: dict): super().__init__(raw) self.raw: dict = raw['doc'] self.id: int = self.raw['id'] self.owner_id: int = self.raw['owner_id'] self.title: str = self.raw['title'] self.size: int = self.raw['size'] # размер в байтах self.ext: str = self.raw['ext'] # расширение self.url: str = self.raw['url'] self.date: int = self.raw['date'] # unix time self.type: DocumentType = self.raw['type'] def __repr__(self): return f'doc{self.owner_id}_{self.id}' Attachment = Type[Photo], Type[Document], Type[Sticker] class Message(VkObject): __slots__ = ( 'vk', 'api', 'raw', 'id', 'conversation_message_id', 'cmid', 'date', 'peer_id', 'from_id', 'user_id', 'chat_id', 'is_chat', 'original_text', 'text', 'attachments', 'payload', 'event_id', 'forwarded_messages', 'reply_message', 'meta' ) def __init__(self, api, raw): super().__init__(raw) self.api = api if type(raw) is Message: self.raw = raw.raw else: self.raw = raw.get('message', raw) self.id: int = self.raw.get('id', 0) self.conversation_message_id: int = self.raw.get('conversation_message_id', 0) self.cmid: int = self.conversation_message_id self.date: int = self.raw.get('date', 0) self.peer_id: int = self.raw.get('peer_id', 0) self.from_id: int = self.raw.get('from_id', 0) self.user_id: int = self.raw.get('user_id', self.from_id) self.chat_id: int = self.peer_id - 2000000000 self.is_chat: bool = self.chat_id > 0 self.original_text: str = self.raw.get('text', '') self.text: str = self.original_text.lower() self.attachments: list[Attachment] = load_attachments(self.raw.get('attachments', [])) raw_payload = self.raw.get('payload', '{}') if type(raw_payload) is dict: self.payload: dict = raw_payload else: self.payload: dict = json.loads(raw_payload) self.event_id: str = self.raw.get('event_id', '') self.forwarded_messages: list = self.raw.get('fwd_messages', []) self.reply_message = Message(self.api, self.raw['reply_message']) if 'reply_message' in self.raw else None self.meta: dict = {} async def send(self, peer_id, text: str = '', attachments: (Tuple[Attachment], Attachment, str) = '', keyboard: Union[VkKeyboard, dict] = None, **kwargs): data = kwargs.copy() data.update({ 'peer_id': peer_id, 'random_id': generate_random_id() }) if text: data.update({'message': text}) if keyboard: if type(keyboard) is VkKeyboard: data.update({'keyboard': keyboard.get_keyboard()}) else: data.update({'keyboard': keyboard}) if attachments: if type(attachments) is str: data.update({'attachment': attachments}) elif type(attachments) in (Photo, Document): data.update({'attachment': str(attachments)}) else: data.update({'attachment': dump_attachments(attachments)}) return MessageID(await self.api.messages.send(**data)) async def answer_event(self, event_data): data = { 'peer_id': self.peer_id, 'event_id': self.event_id, 'user_id': self.user_id, 'event_data': json.dumps(event_data), } try: return await self.api.messages.sendMessageEventAnswer(**data) except VKApiException: ... async def answer_event_hide_keyboard(self, event_data): await self.answer_event(event_data) await self.api.messages.edit( peer_id=self.peer_id, conversation_message_id=self.conversation_message_id, keyboard=VkKeyboard.get_empty_keyboard() ) async def answer(self, text: str = '', attachments: (list[Attachment], Attachment, str) = '', **kwargs): return await self.send(self.peer_id, text, attachments, **kwargs) async def edit(self, text='', attachments: list[Attachment] = '', keyboard: Union[VkKeyboard, dict] = None, **kwargs): data: dict = kwargs.copy() data.update({ 'peer_id': self.peer_id }) if text: data.update({'message': text}) if keyboard: if type(keyboard) is VkKeyboard: data.update({'keyboard': keyboard.get_keyboard()}) else: data.update({'keyboard': keyboard}) if attachments: if type(attachments) is str: data.update({'attachment': attachments}) elif type(attachments) in (Photo, Document): data.update({'attachment': str(attachments)}) elif type(attachments) is list: if type(attachments[0]) is dict: data.update({'attachment': dump_attachments(load_attachments(attachments))}) else: data.update({'attachment': dump_attachments(attachments)}) if 'cmid' not in kwargs and 'conversation_message_id' not in kwargs: data.update({ 'conversation_message_id': self.conversation_message_id }) if 'cmid' in kwargs: data.update({ 'conversation_message_id': kwargs['cmid'] }) kwargs.pop('cmid') try: message_id = await self.api.messages.edit(**data) except VKApiException: message_id = await self.api.messages.send(**data, random_id=generate_random_id()) return MessageID(message_id) async def get_by_cmid(self, cmid: int): data = { 'peer_id': self.peer_id, 'conversation_message_ids': [cmid] } res = await self.api.messages.getByConversationMessageId(**data) return Message(self.api, res['items'][0]) def __repr__(self): return str(self.raw) class MessageContext: __slots__ = ('api', 'msg', 'db', 'args', 'meta') def __init__(self, api, msg, db): self.api: VK = api self.msg: Message = msg self.db: Database = db self.args = None self.meta = {} def __repr__(self): return f'' def load_attachments(raw: list[dict]) -> list[Attachment]: attachments = [] for attachment in raw: match attachment['type']: case 'photo': attachments.append(Photo(attachment)) case 'doc': attachments.append(Document(attachment)) case 'sticker': attachments.append(Sticker(attachment)) case _: attachments.append(VkObject(attachment)) return attachments def dump_attachments(raw_attachments: list[Attachment]) -> str: return ','.join(map(repr, raw_attachments)) class EventActions: SHOW_SNACKBAR = 'show_snackbar' OPEN_LINK = 'open_link' OPEN_APP = 'open_app' class MessageID: def __init__(self, body): if type(body) is dict: self.message_id = body.get('message_id', 0) self.cmid = body.get('cmid', 0) else: self.message_id = body self.cmid = body