from transcendence.abstract.AbstractRoom import AbstractRoom from transcendence.abstract.AbstractRoomMember import AbstractRoomMember from channels.generic.websocket import WebsocketConsumer from asgiref.sync import async_to_sync from .Ball import Ball from .Player import Player from .Spectator import Spectator from .Wall import Wall from .Point import Point from .Segment import Segment import math from .. import config from ..models import GameModel from ..routine import routine import threading from typing import TYPE_CHECKING if TYPE_CHECKING: from .GameManager import GameManager class Game(AbstractRoom): def __init__(self, game_id: int, game_manager): super().__init__(None) self.game_manager: GameManager = game_manager self.ball: Ball = Ball() self.model: GameModel = GameModel.objects.get(pk = game_id) self.stopped: bool = False radius: float = min(config.MAP_SIZE_X, config.MAP_SIZE_Y) / 2 - 10 players_id: list[int] = self.model.get_players_id() nb_sides = 4 polygon: list[Point] = [] for i in range(nb_sides): angle: float = (i * 2 * math.pi / nb_sides) + (math.pi * 3 / nb_sides) x: float = round(config.MAP_CENTER_X + radius * math.cos(angle)) y: float = round(config.MAP_CENTER_Y + radius * math.sin(angle)) polygon.append(Point(x, y)) segments: list[Point] = [] for i in range(nb_sides): segments.append(Segment(polygon[i], polygon[(i + 1) % nb_sides])) self.walls: list[Wall] self.players: list[Player] nb_players: int = len(players_id) if (nb_players == 2): self.players = [Player(self, players_id[0], None, segments[0]), Player(self, players_id[1], None, segments[2])] self.walls = [Wall(segments[1]), Wall(segments[3])] else: self.players = [] self.walls = [] for i, side in enumerate(range(4)): if (i < nb_players): self.players.append(Player(self, players_id[i], None, segments[i])) else: self.walls.append(Wall(segments[i])) self.spectators: list[Spectator] = [] self._updated_players: list[Player] = [] self.game_id: int = game_id self.thread = threading.Thread(target = routine, args=(self,)) self.thread.start() def get_players_id(self): return [player.user_id for player in self.players] def get_players_connected(self) -> list[Player]: return [player for player in self.players if player.is_connected()] def broadcast(self, detail: str, data: dict = {}, excludeds: list[Spectator | Player] = []): members: list[Player | Spectator] = self.get_players_connected() + self.spectators for excluded in excludeds: if (excluded in members): members.remove(excluded) for member in members: member.send(detail, data) def goal(self, goal_taker: Player): timestamp = goal_taker.add_goal() self.broadcast("goal", {"player_id": goal_taker.user_id, "timestamp": timestamp}) if (len(goal_taker.score) >= config.GAME_MAX_SCORE): connected_players: list[Player] = self.get_players_connected() if (len(connected_players) == 2): self.finish(connected_players[not connected_players.index(goal_taker)]) else: goal_taker.eliminate() def get_player_by_user_id(self, user_id: int) -> Player: for player in self.players: if (player.user_id == user_id): return player return None def _send_game_data(self, member: Spectator | Player): member.send("init_game", self.to_dict()) def _everbody_is_here(self): return len(self.players) == len(self.get_players_connected()) def _nobody_is_here(self): return len(self.get_players_connected()) == 0 def _player_join(self, user_id: int, socket: WebsocketConsumer): player = self.get_player_by_user_id(user_id) if (player is None): return None # check if player is already connected if (player.is_connected()): player.disconnect(1001) player.socket = socket if (self._everbody_is_here()): self.start() self.update_player(player) return player def update_player(self, player: Player): self._updated_players.append(player) def finish(self, winner: Player): self.broadcast("finish", {"winner": winner.to_dict()}) self.model.finish(winner.user_id) def _player_leave(self, player: Player): connected_players: list[Player] = self.get_players_connected() if (self.model.started): if (len(connected_players) == 1): print([player.username for player in connected_players]) last_player: Player = connected_players[0] self.finish(last_player) return self.update_player(player) def _spectator_join(self, user_id: int, socket: WebsocketConsumer): spectator: Spectator = Spectator(user_id, socket, self) self.spectators.append(spectator) return spectator def _spectator_leave(self, spectator: Spectator): self.spectators.remove(spectator) def join(self, user_id: int, socket: WebsocketConsumer) -> Spectator | Player: member: Player = self._player_join(user_id, socket) if (member is None): member: Spectator = self._spectator_join(user_id, socket) self._send_game_data(member) return member def start(self): if (self.model.started == True): return self.model.start() def leave(self, member: AbstractRoomMember): if (isinstance(member, Player)): self._player_leave(member) elif (isinstance(member, Spectator)): self._spectator_leave(member) if (self._nobody_is_here()): self.stopped = True self.thread.join(10) self.game_manager.remove(self) def to_dict(self): data: dict = {"ball": self.ball.to_dict(), "players": [player.to_dict() for player in self.players], "walls": [wall.to_dict() for wall in self.walls], } return data