from __future__ import annotations from channels.generic.websocket import WebsocketConsumer from django.contrib.auth.models import User import json from .objects.GameManager import GameManager from .objects.tictactoe.TicTacToePlayer import TicTacToePlayer import time from typing import TYPE_CHECKING if TYPE_CHECKING: from .objects.pong.PongSpectator import PongSpectator from .objects.pong.PongPlayer import PongPlayer from .objects.pong.PongGame import PongGame from .objects.tictactoe.TicTacToeGame import TicTacToeGame from .objects.tictactoe.TicTacToeSpectator import TicTacToeSpectator game_manager: GameManager = GameManager() class TicTacToeWebSocket(WebsocketConsumer): def connect(self): self.user: User = self.scope["user"] if (self.user.pk is None): self.user.pk = 0 self.accept() self.game_id = int(self.scope['url_route']['kwargs']['game_id']) self.game: TicTacToeGame = game_manager.get(self.game_id, "tictactoe") self.member = self.game.join(self.user.pk, self) if (isinstance(self.member, TicTacToePlayer)): self.member.send(self.member.sign) if (self.game._everbody_is_here() and self.game.model.started == False): self.game.broadcast("game_start") self.game.model.start() def receive(self, text_data=None, bytes_data=None): data = json.loads(text_data) if (data.get("targetMorpion") is not None and data.get("targetCase") is not None): if (self.game.add(data, self.member) == False): return self.game.broadcast("", data, [self.member]) pass def disconnect(self, event): self.member.socket = None self.game.time = time.time() self.game.broadcast("opponent_leave_timer") class PongWebSocket(WebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.member = None def connect(self): self.user: User = self.scope["user"] if (self.user.pk is None): self.user.pk = 0 self.accept() self.game_id = int(self.scope['url_route']['kwargs']['game_id']) self.game: PongGame = game_manager.get(self.game_id, "pong") if (self.game is None): self.send(text_data=json.dumps({"detail": "Game not found"})) self.disconnect(1404) return self.member: PongPlayer | PongSpectator = self.game.join(self.user.pk, self) def disconnect(self, code): if (self.member is not None): self.member.disconnect() super().disconnect(code) def receive(self, text_data: str = None, bytes_data: bytes = None): if (text_data is None): return data: dict = json.loads(text_data) self.member.receive(data)