core: game: split backed django

This commit is contained in:
2024-04-08 14:19:53 +02:00
parent c1624cce83
commit 25d86012ba
19 changed files with 192 additions and 117 deletions

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from ... import config
from .Position import Position
from .Point import Point
import time
import math
class Ball:
def __init__(self) -> None:
self.size: float
self.position: Position
self.angle: float
self.speed: float
self.reset()
def reset(self) -> None:
self.size = config.BALL_SIZE
self.position = Position(Point(config.BALL_SPAWN_POS_X + self.size / 2, config.BALL_SPAWN_POS_Y + self.size / 2), time.time())
self.angle = math.pi * 0.3
self.speed = config.BALL_SPEED_START
def to_dict(self) -> dict:
data: dict = {
"size": self.size,
"speed": self.speed,
"position": self.position.to_dict(),
"angle": self.angle,
}
return data
def __str__(self) -> str:
return f"Ball(size: {self.size}, speed: {self.speed}, angle: {self.angle}, position: {self.position})"

View File

@ -0,0 +1,33 @@
from __future__ import annotations
from math import dist
class Point:
def __init__(self, x: float, y: float) -> None:
self.x = x
self.y = y
def __str__(self) -> str:
return f"Point(x: {self.x}, y: {self.y})"
def __repr__(self) -> str:
return f"Point(x: {self.x}, y: {self.x})"
def __eq__(self, __value: object) -> bool:
return (self.x == __value.x and self.y == __value.y)
def distance(self, point: Point):
return dist((point.x, point.y), (self.x, self.y))
def copy(self):
return Point(self.x, self.y)
def to_dict(self) -> dict:
data: dict[str: float] = {
"x": self.x,
"y": self.y,
}
return data

View File

@ -0,0 +1,188 @@
from transcendence.abstract.AbstractRoom import AbstractRoom
from transcendence.abstract.AbstractRoomMember import AbstractRoomMember
from channels.generic.websocket import WebsocketConsumer
from ..AGame import AGame
from .Ball import Ball
from .PongPlayer import PongPlayer
from .PongSpectator import PongSpectator
from .Wall import Wall
from .Point import Point
from .Segment import Segment
import math
from ... import config
from ...routine import routine
import threading
from typing import TYPE_CHECKING
if TYPE_CHECKING:
pass
class PongGame(AGame):
def __init__(self, game_id: int, game_manager):
super().__init__("pong", game_id, game_manager)
self.ball: Ball = Ball()
self.stopped: bool = False
radius: float = min(config.MAP_SIZE_X, config.MAP_SIZE_Y) / 2 - 10
players_id: list[int] = self.model.get_players_id()
nb_sides = 4
polygon: list[Point] = []
for i in range(nb_sides):
angle: float = (i * 2 * math.pi / nb_sides) + (math.pi * 3 / nb_sides)
x: float = round(config.MAP_CENTER_X + radius * math.cos(angle))
y: float = round(config.MAP_CENTER_Y + radius * math.sin(angle))
polygon.append(Point(x, y))
segments: list[Segment] = []
for i in range(nb_sides):
segments.append(Segment(polygon[i], polygon[(i + 1) % nb_sides]))
self.walls: list[Wall]
self.players: list[PongPlayer]
nb_players: int = len(players_id)
if (nb_players == 2):
self.players = [PongPlayer(self, players_id[0], None, segments[0]), PongPlayer(self, players_id[1], None, segments[2])]
self.walls = [Wall(segments[1].start, segments[1].stop), Wall(segments[3].start, segments[3].stop)]
else:
self.players = []
self.walls = []
for i in range(4):
if (i < nb_players):
self.players.append(PongPlayer(self, players_id[i], None, segments[i]))
else:
self.walls.append(Wall(segments[i]))
self.spectators: list[PongSpectator]
self._updated_players: list[PongPlayer] = []
self.game_id: int = game_id
self.thread = threading.Thread(target = routine, args=(self,))
self.thread.start()
def goal(self, goal_taker: PongPlayer):
timestamp = goal_taker.add_goal()
self.broadcast("goal", {"player_id": goal_taker.user_id,
"timestamp": timestamp})
if (len(goal_taker.score) >= config.GAME_MAX_SCORE):
connected_players: list[PongPlayer] = self.get_players_connected()
if (len(connected_players) == 2):
self.finish(connected_players[not connected_players.index(goal_taker)])
else:
goal_taker.eliminate()
def _send_game_data(self, member: PongSpectator | PongPlayer):
member.send("init_game", self.to_dict())
def _everbody_is_here(self):
return len(self.players) == len(self.get_players_connected())
def _nobody_is_here(self):
return len(self.get_players_connected()) == 0
def _player_join(self, user_id: int, socket: WebsocketConsumer):
if (self.model.started):
return None
player = self.get_player_by_user_id(user_id)
if (player is None):
return None
# check if player is already connected
if (player.is_connected()):
player.disconnect(1001)
player.socket = socket
if (self._everbody_is_here()):
self.start()
self.update_player(player)
return player
def update_player(self, player: PongPlayer):
self._updated_players.append(player)
def finish(self, winner: PongPlayer):
self.broadcast("finish", {"winner": winner.to_dict()})
self.model.finish(winner.user_id)
def _player_leave(self, player: PongPlayer):
connected_players: list[PongPlayer] = self.get_players_connected()
if (self.model.started):
if (len(connected_players) == 1):
print([player.username for player in connected_players])
last_player: PongPlayer = connected_players[0]
self.finish(last_player)
return
self.update_player(player)
def _spectator_join(self, user_id: int, socket: WebsocketConsumer):
spectator: PongSpectator = PongSpectator(user_id, socket, self)
self.spectators.append(spectator)
return spectator
def _spectator_leave(self, spectator: PongSpectator):
self.spectators.remove(spectator)
def join(self, user_id: int, socket: WebsocketConsumer) -> PongSpectator | PongPlayer:
member: PongPlayer = self._player_join(user_id, socket)
if (member is None):
member: PongSpectator = self._spectator_join(user_id, socket)
self._send_game_data(member)
return member
def start(self):
if (self.model.started == True):
return
self.model.start()
def leave(self, member: AbstractRoomMember):
if (isinstance(member, PongPlayer)):
self._player_leave(member)
elif (isinstance(member, PongSpectator)):
self._spectator_leave(member)
if (self._nobody_is_here()):
self.stopped = True
self.thread.join(10)
self.game_manager.remove(self)
def to_dict(self):
data: dict = {"ball": self.ball.to_dict(),
"players": [player.to_dict() for player in self.players],
"walls": [wall.to_dict() for wall in self.walls],
}
return data

View File

@ -0,0 +1,126 @@
from __future__ import annotations
from ... import config
from channels.generic.websocket import WebsocketConsumer
from django.contrib.auth.models import User
from .Position import Position
from ..APlayer import APlayer
from .Segment import Segment
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .PongGame import PongGame
class PongPlayer(APlayer):
def __init__(self, game: PongGame, user_id: int, socket: WebsocketConsumer, rail: Segment) -> None:
super().__init__(user_id, socket, game)
self.position: Position = Position(0.5, 0)
self.score: list[int] = []
self.rail: Segment = rail
self.username: str = User.objects.get(pk = self.user_id).username
def eliminate(self):
self.disconnect(1000)
self.game.update_player(self)
def receive(self, data: dict):
detail: str = data.get("detail")
if (detail is None):
return
if (detail == "update_my_paddle_pos"):
self.update_position(data)
def update_position(self, data: dict):
new_position: Position = Position()
new_position.location = data.get("position")
if (new_position.position is None):
self.send_error("missing new_position")
return
new_position.time = data.get("time")
if (new_position.time is None):
self.send_error("missing time")
return
if (self.position.time > new_position.time):
self.game_member.send_error("time error")
return
distance: float = abs(self.position.position - new_position.position)
sign: int = 1 if self.position.position >= new_position.position else -1
time_difference: float = (new_position.time - self.position.time) / 1000
max_distance: float = config.PADDLE_SPEED_PER_SECOND_MAX * (time_difference) * config.PADDLE_SPEED_PER_SECOND_TOLERANCE
new_position_verified: Position = new_position.copy()
if (distance > max_distance):
new_position_verified.position = self.position.position + max_distance * sign
if (not config.PADDLE_POSITION_MIN <= new_position_verified.position <= config.PADDLE_POSITION_MAX):
new_position_verified.position = max(new_position_verified.position, config.PADDLE_POSITION_MIN)
new_position_verified.position = min(new_position_verified.position, config.PADDLE_POSITION_MAX)
invalid_pos: bool = new_position.position != new_position_verified.position
if (new_position != self.position):
self.game.update_player(self)
self.position = new_position
if (invalid_pos):
self.send("update_player", self.to_dict())
def connect(self, socket: WebsocketConsumer):
self.socket = socket
self.accept()
self.game.update_player(self)
def disconnect(self, code: int = 1000):
self.socket = None
self.game.leave(self)
def add_goal(self):
timestamp = self.game.model.add_goal(self.user_id)
self.score.append(timestamp)
return timestamp
def to_dict(self) -> dict:
data = {
"username": self.username,
"id": self.user_id,
"position": self.position.to_dict(),
"score": [*self.score],
"rail": self.rail.to_dict(),
"isConnected": self.is_connected(),
}
return data

View File

@ -0,0 +1,29 @@
from __future__ import annotations
from channels.generic.websocket import WebsocketConsumer
from transcendence.abstract.AbstractRoomMember import AbstractRoomMember
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .PongPlayer import PongPlayer
from .PongGame import PongGame
from ..ASpectator import ASpectator
from .Ball import Ball
class PongSpectator(ASpectator):
def __init__(self, user_id: int, socket: WebsocketConsumer, game: PongGame):
super().__init__(user_id, socket)
self.game: PongGame = game
def send_paddle(self, player: PongPlayer):
self.send("update_player", player.to_dict())
def send_ball(self, ball: Ball):
self.send("update_ball", ball.to_dict())
def disconnect(self, code: int = 1000):
self.game.leave(self)

View File

@ -0,0 +1,27 @@
from __future__ import annotations
from .Point import Point
class Position:
def __init__(self, location: int | Point = 0, time: int = 0) -> None:
self.time: float = time
self.location: float = location
def copy(self):
return Position(self.location, self.time)
def to_dict(self):
data: dict = {
"time": self.time,
}
try:
data.update({"location": self.location.to_dict()})
except:
data.update({"location": self.location})
return data
def __eq__(self, __value: Position) -> bool:
return (self.location == __value.location)

View File

@ -0,0 +1,38 @@
from .Point import Point
from .Vector import Vector
import math
class Segment:
def __init__(self, start: Point, stop: Point) -> None:
self.start: Point = start
self.stop: Point = stop
def angle(self) -> float:
return math.atan2((self.start.y - self.stop.y), (self.start.x - self.stop.x))
def length(self):
return self.start.distance(self.stop)
def is_on(self, C: Point):
return (self.start.distance(C) + self.stop.distance(C) == self.length())
def __repr__(self) -> str:
return f"Segment(start: {self.start}, stop: {self.stop})"
def __str__(self) -> str:
return f"Segment(start: {self.start}, stop: {self.stop})"
def copy(self):
return Segment(self.start.copy(), self.stop.copy())
def to_dict(self) -> dict:
data: dict[str: dict] = {
"start": self.start.to_dict(),
"stop": self.stop.to_dict(),
}
return data

View File

@ -0,0 +1,35 @@
from __future__ import annotations
import math
from .Point import Point
class Vector:
def __init__(self, x: float, y: float) -> None:
self.norm: float = math.dist((0, 0), (x, y))
self.x: float = x
self.y: float = y
def __truediv__(self, denominator: float):
return Vector(self.x / denominator, self.y / denominator)
def angle(self, vector: Vector):
scalar_product: float = self.scalar(vector)
if (scalar_product is None):
return None
cos: float = scalar_product / (vector.norm * self.norm)
angle: float = math.acos(cos)
return angle
def scalar(self, vector: Vector):
return self.x * vector.x + vector.y * self.y
def __str__(self) -> str:
return f"Vector(x: {self.x}, y: {self.y}, norme: {self.norm})"
def __eq__(self, __value: Vector) -> bool:
return (self.x == __value.x and
self.x == __value.x and
self.norm == __value.norm)

View File

@ -0,0 +1,5 @@
from .Segment import Segment
class Wall(Segment):
pass