188 lines
6.0 KiB
Python
188 lines
6.0 KiB
Python
from transcendence.abstract.AbstractRoom import AbstractRoom
|
|
from transcendence.abstract.AbstractRoomMember import AbstractRoomMember
|
|
|
|
from channels.generic.websocket import WebsocketConsumer
|
|
|
|
from ..AGame import AGame
|
|
from .Ball import Ball
|
|
from .PongPlayer import PongPlayer
|
|
from .PongSpectator import PongSpectator
|
|
from .Wall import Wall
|
|
from .Point import Point
|
|
from .Segment import Segment
|
|
|
|
import math
|
|
|
|
from ... import config
|
|
|
|
from ...routine import routine
|
|
|
|
import threading
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from profiles.models import ProfileModel
|
|
|
|
class PongGame(AGame):
|
|
|
|
def __init__(self, game_id: int, game_manager):
|
|
|
|
super().__init__("pong", game_id, game_manager)
|
|
|
|
self.ball: Ball = Ball()
|
|
|
|
self.stopped: bool = False
|
|
|
|
radius: float = min(config.MAP_SIZE_X, config.MAP_SIZE_Y) / 2 - 10
|
|
|
|
players: list[ProfileModel] = self.model.get_players()
|
|
|
|
nb_sides = 4
|
|
|
|
polygon: list[Point] = []
|
|
|
|
for i in range(nb_sides):
|
|
|
|
angle: float = (i * 2 * math.pi / nb_sides) + (math.pi * 3 / nb_sides)
|
|
|
|
x: float = round(config.MAP_CENTER_X + radius * math.cos(angle))
|
|
y: float = round(config.MAP_CENTER_Y + radius * math.sin(angle))
|
|
|
|
polygon.append(Point(x, y))
|
|
|
|
segments: list[Segment] = []
|
|
for i in range(nb_sides):
|
|
segments.append(Segment(polygon[i], polygon[(i + 1) % nb_sides]))
|
|
|
|
self.walls: list[Wall]
|
|
self.players: list[PongPlayer]
|
|
|
|
nb_players: int = len(players)
|
|
if (nb_players == 2):
|
|
self.players = [PongPlayer(self, players[0], None, segments[0]), PongPlayer(self, players[1], None, segments[2])]
|
|
self.walls = [Wall(segments[1].start, segments[1].stop), Wall(segments[3].start, segments[3].stop)]
|
|
else:
|
|
self.players = []
|
|
self.walls = []
|
|
for i in range(4):
|
|
if (i < nb_players):
|
|
self.players.append(PongPlayer(self, players[i], None, segments[i]))
|
|
else:
|
|
self.walls.append(Wall(segments[i]))
|
|
|
|
self.spectators: list[PongSpectator]
|
|
|
|
self._updated_players: list[PongPlayer] = []
|
|
|
|
self.game_id: int = game_id
|
|
|
|
self.thread = threading.Thread(target = routine, args=(self,))
|
|
|
|
self.thread.start()
|
|
|
|
def goal(self, goal_taker: PongPlayer):
|
|
|
|
timestamp = goal_taker.add_goal()
|
|
|
|
self.broadcast("goal", {"player_id": goal_taker.user_id,
|
|
"timestamp": timestamp})
|
|
if (len(goal_taker.score) >= config.GAME_MAX_SCORE):
|
|
connected_players: list[PongPlayer] = self.get_players_connected()
|
|
if (len(connected_players) == 2):
|
|
self.finish(connected_players[not connected_players.index(goal_taker)])
|
|
else:
|
|
goal_taker.eliminate()
|
|
|
|
def _send_game_data(self, member: PongSpectator | PongPlayer):
|
|
member.send("init_game", self.to_dict())
|
|
|
|
def _everbody_is_here(self):
|
|
return len(self.players) == len(self.get_players_connected())
|
|
|
|
def _nobody_is_here(self):
|
|
return len(self.get_players_connected()) == 0
|
|
|
|
def _player_join(self, user_id: int, socket: WebsocketConsumer):
|
|
|
|
if (self.model.started):
|
|
return None
|
|
|
|
player = self.get_player_by_user_id(user_id)
|
|
if (player is None):
|
|
return None
|
|
|
|
# check if player is already connected
|
|
if (player.is_connected()):
|
|
player.disconnect(1001)
|
|
|
|
player.socket = socket
|
|
|
|
if (self._everbody_is_here()):
|
|
self.start()
|
|
|
|
self.update_player(player)
|
|
|
|
return player
|
|
|
|
def update_player(self, player: PongPlayer):
|
|
self._updated_players.append(player)
|
|
|
|
def finish(self, winner: PongPlayer):
|
|
self.broadcast("finish", {"winner": winner.to_dict()})
|
|
self.model.finish(winner.user_id)
|
|
|
|
def _player_leave(self, player: PongPlayer):
|
|
|
|
connected_players: list[PongPlayer] = self.get_players_connected()
|
|
|
|
if (self.model.started):
|
|
if (len(connected_players) == 1):
|
|
print([player.username for player in connected_players])
|
|
last_player: PongPlayer = connected_players[0]
|
|
self.finish(last_player)
|
|
return
|
|
|
|
self.update_player(player)
|
|
|
|
def _spectator_join(self, user_id: int, socket: WebsocketConsumer):
|
|
|
|
spectator: PongSpectator = PongSpectator(user_id, socket, self)
|
|
|
|
self.spectators.append(spectator)
|
|
|
|
return spectator
|
|
|
|
def _spectator_leave(self, spectator: PongSpectator):
|
|
self.spectators.remove(spectator)
|
|
|
|
def join(self, user_id: int, socket: WebsocketConsumer) -> PongSpectator | PongPlayer:
|
|
member: PongPlayer = self._player_join(user_id, socket)
|
|
if (member is None):
|
|
member: PongSpectator = self._spectator_join(user_id, socket)
|
|
self._send_game_data(member)
|
|
return member
|
|
|
|
def start(self):
|
|
if (self.model.started == True):
|
|
return
|
|
self.model.start()
|
|
|
|
def leave(self, member: AbstractRoomMember):
|
|
if (isinstance(member, PongPlayer)):
|
|
self._player_leave(member)
|
|
elif (isinstance(member, PongSpectator)):
|
|
self._spectator_leave(member)
|
|
if (self._nobody_is_here()):
|
|
self.stopped = True
|
|
self.thread.join(10)
|
|
self.game_manager.remove(self)
|
|
|
|
def to_dict(self):
|
|
|
|
data: dict = {"ball": self.ball.to_dict(),
|
|
"players": [player.to_dict() for player in self.players],
|
|
"walls": [wall.to_dict() for wall in self.walls],
|
|
}
|
|
|
|
return data |