42_ft_transcendence/games/objects/Game.py

216 lines
6.8 KiB
Python

from transcendence.abstract.AbstractRoom import AbstractRoom
from transcendence.abstract.AbstractRoomMember import AbstractRoomMember
from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync
from .Ball import Ball
from .Player import Player
from .Spectator import Spectator
from .Wall import Wall
from .Point import Point
from .Segment import Segment
import math
from .. import config
from ..models import GameModel
from ..routine import routine
import threading
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .GameManager import GameManager
class Game(AbstractRoom):
def __init__(self, game_id: int, game_manager):
super().__init__(None)
self.game_manager: GameManager = game_manager
self.ball: Ball = Ball()
self.model: GameModel = GameModel.objects.get(pk = game_id)
self.stopped: bool = False
radius: float = min(config.MAP_SIZE_X, config.MAP_SIZE_Y) / 2 - 10
players_id: list[int] = self.model.get_players_id()
nb_sides = 4
polygon: list[Point] = []
for i in range(nb_sides):
angle: float = (i * 2 * math.pi / nb_sides) + (math.pi * 3 / nb_sides)
x: float = round(config.MAP_CENTER_X + radius * math.cos(angle))
y: float = round(config.MAP_CENTER_Y + radius * math.sin(angle))
polygon.append(Point(x, y))
segments: list[Point] = []
for i in range(nb_sides):
segments.append(Segment(polygon[i], polygon[(i + 1) % nb_sides]))
self.walls: list[Wall]
self.players: list[Player]
nb_players: int = len(players_id)
if (nb_players == 2):
self.players = [Player(self, players_id[0], None, segments[0]), Player(self, players_id[1], None, segments[2])]
self.walls = [Wall(segments[1]), Wall(segments[3])]
else:
self.players = []
self.walls = []
for i, side in enumerate(range(4)):
if (i < nb_players):
self.players.append(Player(self, players_id[i], None, segments[i]))
else:
self.walls.append(Wall(segments[i]))
self.spectators: list[Spectator] = []
self._updated_players: list[Player] = []
self.game_id: int = game_id
self.thread = threading.Thread(target = routine, args=(self,))
self.thread.start()
def get_players_id(self):
return [player.user_id for player in self.players]
def get_players_connected(self) -> list[Player]:
return [player for player in self.players if player.is_connected()]
def broadcast(self, detail: str, data: dict = {}, excludeds: list[Spectator | Player] = []):
members: list[Player | Spectator] = self.get_players_connected() + self.spectators
for excluded in excludeds:
if (excluded in members):
members.remove(excluded)
for member in members:
member.send(detail, data)
def goal(self, goal_taker: Player):
timestamp = goal_taker.add_goal()
self.broadcast("goal", {"player_id": goal_taker.user_id,
"timestamp": timestamp})
if (len(goal_taker.score) >= config.GAME_MAX_SCORE):
connected_players: list[Player] = self.get_players_connected()
if (len(connected_players) == 2):
self.finish(connected_players[not connected_players.index(goal_taker)])
else:
goal_taker.eliminate()
def get_player_by_user_id(self, user_id: int) -> Player:
for player in self.players:
if (player.user_id == user_id):
return player
return None
def _send_game_data(self, member: Spectator | Player):
member.send("init_game", self.to_dict())
def _everbody_is_here(self):
return len(self.players) == len(self.get_players_connected())
def _nobody_is_here(self):
return len(self.get_players_connected()) == 0
def _player_join(self, user_id: int, socket: WebsocketConsumer):
if (self.model.started):
return None
player = self.get_player_by_user_id(user_id)
if (player is None):
return None
# check if player is already connected
if (player.is_connected()):
player.disconnect(1001)
player.socket = socket
if (self._everbody_is_here()):
self.start()
self.update_player(player)
return player
def update_player(self, player: Player):
self._updated_players.append(player)
def finish(self, winner: Player):
self.broadcast("finish", {"winner": winner.to_dict()})
self.model.finish(winner.user_id)
def _player_leave(self, player: Player):
connected_players: list[Player] = self.get_players_connected()
if (self.model.started):
if (len(connected_players) == 1):
print([player.username for player in connected_players])
last_player: Player = connected_players[0]
self.finish(last_player)
return
self.update_player(player)
def _spectator_join(self, user_id: int, socket: WebsocketConsumer):
spectator: Spectator = Spectator(user_id, socket, self)
self.spectators.append(spectator)
return spectator
def _spectator_leave(self, spectator: Spectator):
self.spectators.remove(spectator)
def join(self, user_id: int, socket: WebsocketConsumer) -> Spectator | Player:
member: Player = self._player_join(user_id, socket)
if (member is None):
member: Spectator = self._spectator_join(user_id, socket)
self._send_game_data(member)
return member
def start(self):
if (self.model.started == True):
return
self.model.start()
def leave(self, member: AbstractRoomMember):
if (isinstance(member, Player)):
self._player_leave(member)
elif (isinstance(member, Spectator)):
self._spectator_leave(member)
if (self._nobody_is_here()):
self.stopped = True
self.thread.join(10)
self.game_manager.remove(self)
def to_dict(self):
data: dict = {"ball": self.ball.to_dict(),
"players": [player.to_dict() for player in self.players],
"walls": [wall.to_dict() for wall in self.walls],
}
return data