42_ft_transcendence/games/objects/pong/PongGame.py

188 lines
6.0 KiB
Python

from transcendence.abstract.AbstractRoom import AbstractRoom
from transcendence.abstract.AbstractRoomMember import AbstractRoomMember
from channels.generic.websocket import WebsocketConsumer
from ..AGame import AGame
from .Ball import Ball
from .PongPlayer import PongPlayer
from .PongSpectator import PongSpectator
from .Wall import Wall
from .Point import Point
from .Segment import Segment
import math
from ... import config
from ...routine import routine
import threading
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from profiles.models import ProfileModel
class PongGame(AGame):
def __init__(self, game_id: int, game_manager):
super().__init__("pong", game_id, game_manager)
self.ball: Ball = Ball()
self.stopped: bool = False
radius: float = min(config.MAP_SIZE_X, config.MAP_SIZE_Y) / 2 - 10
players: list[ProfileModel] = self.model.get_players()
nb_sides = 4
polygon: list[Point] = []
for i in range(nb_sides):
angle: float = (i * 2 * math.pi / nb_sides) + (math.pi * 3 / nb_sides)
x: float = round(config.MAP_CENTER_X + radius * math.cos(angle))
y: float = round(config.MAP_CENTER_Y + radius * math.sin(angle))
polygon.append(Point(x, y))
segments: list[Segment] = []
for i in range(nb_sides):
segments.append(Segment(polygon[i], polygon[(i + 1) % nb_sides]))
self.walls: list[Wall]
self.players: list[PongPlayer]
nb_players: int = len(players)
if (nb_players == 2):
self.players = [PongPlayer(self, players[0], None, segments[0]), PongPlayer(self, players[1], None, segments[2])]
self.walls = [Wall(segments[1].start, segments[1].stop), Wall(segments[3].start, segments[3].stop)]
else:
self.players = []
self.walls = []
for i in range(4):
if (i < nb_players):
self.players.append(PongPlayer(self, players[i], None, segments[i]))
else:
self.walls.append(Wall(segments[i]))
self.spectators: list[PongSpectator]
self._updated_players: list[PongPlayer] = []
self.game_id: int = game_id
self.thread = threading.Thread(target = routine, args=(self,))
self.thread.start()
def goal(self, goal_taker: PongPlayer):
timestamp = goal_taker.add_goal()
self.broadcast("goal", {"player_id": goal_taker.user_id,
"timestamp": timestamp})
if (len(goal_taker.score) >= config.GAME_MAX_SCORE):
connected_players: list[PongPlayer] = self.get_players_connected()
if (len(connected_players) == 2):
self.finish(connected_players[not connected_players.index(goal_taker)])
else:
goal_taker.eliminate()
def _send_game_data(self, member: PongSpectator | PongPlayer):
member.send("init_game", self.to_dict())
def _everbody_is_here(self):
return len(self.players) == len(self.get_players_connected())
def _nobody_is_here(self):
return len(self.get_players_connected()) == 0
def _player_join(self, user_id: int, socket: WebsocketConsumer):
if (self.model.started):
return None
player = self.get_player_by_user_id(user_id)
if (player is None):
return None
# check if player is already connected
if (player.is_connected()):
player.disconnect(1001)
player.socket = socket
if (self._everbody_is_here()):
self.start()
self.update_player(player)
return player
def update_player(self, player: PongPlayer):
self._updated_players.append(player)
def finish(self, winner: PongPlayer):
self.broadcast("finish", {"winner": winner.to_dict()})
self.model.finish(winner.user_id)
def _player_leave(self, player: PongPlayer):
connected_players: list[PongPlayer] = self.get_players_connected()
if (self.model.started):
if (len(connected_players) == 1):
print([player.username for player in connected_players])
last_player: PongPlayer = connected_players[0]
self.finish(last_player)
return
self.update_player(player)
def _spectator_join(self, user_id: int, socket: WebsocketConsumer):
spectator: PongSpectator = PongSpectator(user_id, socket, self)
self.spectators.append(spectator)
return spectator
def _spectator_leave(self, spectator: PongSpectator):
self.spectators.remove(spectator)
def join(self, user_id: int, socket: WebsocketConsumer) -> PongSpectator | PongPlayer:
member: PongPlayer = self._player_join(user_id, socket)
if (member is None):
member: PongSpectator = self._spectator_join(user_id, socket)
self._send_game_data(member)
return member
def start(self):
if (self.model.started == True):
return
self.model.start()
def leave(self, member: AbstractRoomMember):
if (isinstance(member, PongPlayer)):
self._player_leave(member)
elif (isinstance(member, PongSpectator)):
self._spectator_leave(member)
if (self._nobody_is_here()):
self.stopped = True
self.thread.join(10)
self.game_manager.remove(self)
def to_dict(self):
data: dict = {"ball": self.ball.to_dict(),
"players": [player.to_dict() for player in self.players],
"walls": [wall.to_dict() for wall in self.walls],
}
return data