from transcendence.abstract.AbstractRoom import AbstractRoom from transcendence.abstract.AbstractRoomMember import AbstractRoomMember from channels.generic.websocket import WebsocketConsumer from ..AGame import AGame from .Ball import Ball from .PongPlayer import PongPlayer from .PongSpectator import PongSpectator from .Wall import Wall from .Point import Point from .Segment import Segment import math from ... import config from ...routine import routine import threading from typing import TYPE_CHECKING if TYPE_CHECKING: pass class PongGame(AGame): def __init__(self, game_id: int, game_manager): super().__init__("pong", game_id, game_manager) self.ball: Ball = Ball() self.stopped: bool = False radius: float = min(config.MAP_SIZE_X, config.MAP_SIZE_Y) / 2 - 10 players_id: list[int] = self.model.get_players_id() nb_sides = 4 polygon: list[Point] = [] for i in range(nb_sides): angle: float = (i * 2 * math.pi / nb_sides) + (math.pi * 3 / nb_sides) x: float = round(config.MAP_CENTER_X + radius * math.cos(angle)) y: float = round(config.MAP_CENTER_Y + radius * math.sin(angle)) polygon.append(Point(x, y)) segments: list[Segment] = [] for i in range(nb_sides): segments.append(Segment(polygon[i], polygon[(i + 1) % nb_sides])) self.walls: list[Wall] self.players: list[PongPlayer] nb_players: int = len(players_id) if (nb_players == 2): self.players = [PongPlayer(self, players_id[0], None, segments[0]), PongPlayer(self, players_id[1], None, segments[2])] self.walls = [Wall(segments[1].start, segments[1].stop), Wall(segments[3].start, segments[3].stop)] else: self.players = [] self.walls = [] for i in range(4): if (i < nb_players): self.players.append(PongPlayer(self, players_id[i], None, segments[i])) else: self.walls.append(Wall(segments[i])) self.spectators: list[PongSpectator] self._updated_players: list[PongPlayer] = [] self.game_id: int = game_id self.thread = threading.Thread(target = routine, args=(self,)) self.thread.start() def goal(self, goal_taker: PongPlayer): timestamp = goal_taker.add_goal() self.broadcast("goal", {"player_id": goal_taker.user_id, "timestamp": timestamp}) if (len(goal_taker.score) >= config.GAME_MAX_SCORE): connected_players: list[PongPlayer] = self.get_players_connected() if (len(connected_players) == 2): self.finish(connected_players[not connected_players.index(goal_taker)]) else: goal_taker.eliminate() def _send_game_data(self, member: PongSpectator | PongPlayer): member.send("init_game", self.to_dict()) def _everbody_is_here(self): return len(self.players) == len(self.get_players_connected()) def _nobody_is_here(self): return len(self.get_players_connected()) == 0 def _player_join(self, user_id: int, socket: WebsocketConsumer): if (self.model.started): return None player = self.get_player_by_user_id(user_id) if (player is None): return None # check if player is already connected if (player.is_connected()): player.disconnect(1001) player.socket = socket if (self._everbody_is_here()): self.start() self.update_player(player) return player def update_player(self, player: PongPlayer): self._updated_players.append(player) def finish(self, winner: PongPlayer): self.broadcast("finish", {"winner": winner.to_dict()}) self.model.finish(winner.user_id) def _player_leave(self, player: PongPlayer): connected_players: list[PongPlayer] = self.get_players_connected() if (self.model.started): if (len(connected_players) == 1): print([player.username for player in connected_players]) last_player: PongPlayer = connected_players[0] self.finish(last_player) return self.update_player(player) def _spectator_join(self, user_id: int, socket: WebsocketConsumer): spectator: PongSpectator = PongSpectator(user_id, socket, self) self.spectators.append(spectator) return spectator def _spectator_leave(self, spectator: PongSpectator): self.spectators.remove(spectator) def join(self, user_id: int, socket: WebsocketConsumer) -> PongSpectator | PongPlayer: member: PongPlayer = self._player_join(user_id, socket) if (member is None): member: PongSpectator = self._spectator_join(user_id, socket) self._send_game_data(member) return member def start(self): if (self.model.started == True): return self.model.start() def leave(self, member: AbstractRoomMember): if (isinstance(member, PongPlayer)): self._player_leave(member) elif (isinstance(member, PongSpectator)): self._spectator_leave(member) if (self._nobody_is_here()): self.stopped = True self.thread.join(10) self.game_manager.remove(self) def to_dict(self): data: dict = {"ball": self.ball.to_dict(), "players": [player.to_dict() for player in self.players], "walls": [wall.to_dict() for wall in self.walls], } return data