This commit is contained in:
AdrienLSH
2024-05-14 08:50:37 +02:00
parent 95f0097ce5
commit e308e8f012
231 changed files with 70 additions and 22 deletions

View File

@ -0,0 +1,31 @@
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import permissions, status
from django.http import HttpRequest
from . import config
class GameConfigView(APIView):
permission_classes = (permissions.AllowAny,)
def get(self, request: HttpRequest):
config_data = {
"MAP_SIZE_X": config.MAP_SIZE_X,
"MAP_SIZE_Y": config.MAP_SIZE_Y,
"WALL_RATIO": config.WALL_RATIO,
"PADDLE_SPEED_PER_SECOND_MAX": config.PADDLE_SPEED_PER_SECOND_MAX,
"PADDLE_RATIO": config.PADDLE_RATIO,
"BALL_SIZE": config.BALL_SIZE,
"BALL_SPEED_INC": config.BALL_SPEED_INC,
"BALL_SPEED_START": config.BALL_SPEED_START,
"STROKE_THICKNESS": config.STROKE_THICKNESS,
"GAME_MAX_SCORE": config.GAME_MAX_SCORE,
}
return Response(config_data, status = status.HTTP_200_OK)

View File

@ -0,0 +1,28 @@
from rest_framework.viewsets import ViewSet
from rest_framework.response import Response
from rest_framework import permissions
from django.http import HttpRequest
from django.contrib.auth.models import User
from django.shortcuts import get_object_or_404
from .models import GameMembersModel, GameModel
from .serializers import GameSerializer
class GameHistoryView(ViewSet):
queryset = User.objects.all()
serializer_class = GameSerializer
permission_classes = (permissions.IsAuthenticatedOrReadOnly,)
def retrive(self, request: HttpRequest, pk: int = None):
user: User = get_object_or_404(User, pk=pk)
member_game_model_list: list[GameMembersModel] = GameMembersModel.objects.filter(player=user)
game_model_list: list[GameModel] = [member_game_model.game for member_game_model in member_game_model_list]
games_data: list[dict] = self.serializer_class(game_model_list, many=True).data
return Response(games_data)

View File

@ -0,0 +1,27 @@
from rest_framework import viewsets
from rest_framework.response import Response
from rest_framework import permissions, status
from rest_framework.authentication import SessionAuthentication
from django.http import HttpRequest
from django.db.models import QuerySet
from .models import GameModel
from .serializers import GameSerializer
# Create your views here.
class GameViewSet(viewsets.ModelViewSet):
queryset = GameModel.objects
serializer_class = GameSerializer
permission_classes = (permissions.AllowAny,)
authentication_classes = (SessionAuthentication,)
def retrieve(self, request: HttpRequest, pk):
if (not self.queryset.filter(pk = pk).exists()):
return Response({"detail": "Game not found."}, status=status.HTTP_404_NOT_FOUND)
game = self.queryset.get(pk = pk)
return Response(self.serializer_class(game).data, status=status.HTTP_200_OK)

0
django/games/__init__.py Normal file
View File

6
django/games/admin.py Normal file
View File

@ -0,0 +1,6 @@
from django.contrib import admin
from .models import GameModel
# Register your models here.
admin.site.register(GameModel)

6
django/games/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class GamesConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'games'

24
django/games/config.py Normal file
View File

@ -0,0 +1,24 @@
PADDLE_SPEED_PER_SECOND_MAX = 0.6
PADDLE_SPEED_PER_SECOND_TOLERANCE = 1.01
PADDLE_RATIO = 0.3
PADDLE_POSITION_MIN: float = PADDLE_RATIO / 2
PADDLE_POSITION_MAX: float = 1 - PADDLE_POSITION_MIN
MAP_SIZE_X = 700
MAP_SIZE_Y = 700
MAP_CENTER_X = MAP_SIZE_X / 2
MAP_CENTER_Y = MAP_SIZE_Y / 2
WALL_RATIO = 1
BALL_SPEED_INC = 50
BALL_SPEED_START = 170
BALL_SIZE = 4
BALL_SPAWN_POS_X = MAP_SIZE_X / 2
BALL_SPAWN_POS_Y = MAP_SIZE_Y / 2
SERVER_TPS = 20
STROKE_THICKNESS = 10
GAME_MAX_SCORE = 3

121
django/games/consumers.py Normal file
View File

@ -0,0 +1,121 @@
from __future__ import annotations
from channels.generic.websocket import WebsocketConsumer
from django.contrib.auth.models import User
import json
from .objects.GameManager import GameManager
from .objects.tictactoe.TicTacToePlayer import TicTacToePlayer
import time
from .objects.pong.PongPlayer import PongPlayer
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .objects.pong.PongSpectator import PongSpectator
from .objects.pong.PongGame import PongGame
from .objects.tictactoe.TicTacToeGame import TicTacToeGame
game_manager: GameManager = GameManager()
class TicTacToeWebSocket(WebsocketConsumer):
def connect(self):
self.user: User = self.scope["user"]
if (self.user.pk is None):
self.user.pk = 0
self.accept()
self.game_id = int(self.scope['url_route']['kwargs']['game_id'])
self.game: TicTacToeGame = game_manager.get(self.game_id, "tictactoe")
if (self.game is None):
return
self.member = self.game.join(self.user.pk, self)
if (isinstance(self.member, TicTacToePlayer)): # Send player sign (x or o) to the player
self.member.send(self.member.sign)
if (self.game._everbody_is_here() and self.game.model.started == True and self.game.model.finished == False): # if an user disconnected and joined back
self.member.send("catchup", {"morpion" : self.game._map, "turn" : self.game.turn, "currentMorpion": self.member.currentMorpion})
return
if (self.game._everbody_is_here() and self.game.model.started == False): # Launch the game if the two players are connected
self.game.broadcast("game_start")
self.game.time = time.time()
self.game.model.start()
def receive(self, text_data=None, bytes_data=None):
data = json.loads(text_data)
if (data.get("targetMorpion") is not None and data.get("targetCase") is not None): # A move has been played
if (self.game.add(data, self.member) == False): # If the move is invalid
return
self.game.broadcast("game_move", data, [self.member])
if (data.get("timerIsDue") is not None and self.game.time + 20 < time.time()): # Frontend asking if the timer is due
self.winner = 'x' if self.game.turn == 'o' else 'o'
self.game.model.finish(self.user)
self.game.broadcast("game_end", {"winning_sign": self.winner})
if (self.game.checkWin() != False): # Check if after a move, the game is finished
self.winner = self.game.checkWin()
self.game.model.finish(self.user)
self.game.broadcast("game_end", {"winning_sign": self.winner})
def disconnect(self, event):
try:
self.member.socket = None
except:
pass
class PongWebSocket(WebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.member = None
def connect(self):
self.user: User = self.scope["user"]
if (self.user.pk is None):
self.user.pk = 0
self.accept()
self.game_id = int(self.scope['url_route']['kwargs']['game_id'])
self.game: PongGame = game_manager.get(self.game_id, "pong")
if (self.game is None):
self.send(text_data=json.dumps({"detail": "Game not found"}))
self.disconnect(1404)
return
self.member: PongPlayer | PongSpectator = self.game.join(self.user, self)
def disconnect(self, code):
if (self.member is not None):
self.member.disconnect()
super().disconnect(code)
def receive(self, text_data: str = None, bytes_data: bytes = None):
if (text_data is None):
return
data: dict = json.loads(text_data)
if (isinstance(self.member, PongPlayer)):
self.member.receive(data)

66
django/games/models.py Normal file
View File

@ -0,0 +1,66 @@
from __future__ import annotations
from django.db import models
from django.db.models import QuerySet, CASCADE
from django.contrib.auth.models import User
import time
class GameModel(models.Model):
finished = models.BooleanField(default = False)
started = models.BooleanField(default = False)
winner = models.ForeignKey(User, on_delete=CASCADE, null=True, blank=True)
start_timestamp = models.BigIntegerField(null = True, blank = True)
stop_timestamp = models.BigIntegerField(null = True, blank = True)
game_type = models.CharField(max_length = 60, default = "pong")
def create(self, players: set[User]) -> GameModel:
self.save()
for player in players:
GameMembersModel(game = self, player=player).save()
return self
def start(self):
self.start_timestamp = round(time.time() * 1000, 1)
self.started = True
self.save()
def finish(self, winner: User):
self.winner = winner
self.finished = True
self.stop_timestamp = round(time.time() * 1000, 1)
self.save()
def get_players(self) -> list[User]:
return [game_player.player for game_player in GameMembersModel.objects.filter(game = self)]
def get_players_profiles(self) -> list[User]:
return [game_player.player.profilemodel for game_player in GameMembersModel.objects.filter(game = self)]
def get_score_by_player_id(self, player_id: int) -> list[int]:
query: QuerySet = GameGoalModel.objects.filter(game_id = self.pk, player_id = player_id)
score_data: list[int] = [game_goal.timestamp for game_goal in query]
return score_data
def add_goal(self, goal_defenser: User):
timestamp: int = round(time.time() * 1000, 1) - self.start_timestamp
goal_model: GameGoalModel = GameGoalModel(player=goal_defenser, game=self, timestamp=timestamp)
goal_model.save()
return timestamp
class GameMembersModel(models.Model):
game = models.ForeignKey(GameModel, on_delete=CASCADE)
player = models.ForeignKey(User, on_delete=CASCADE)
class GameGoalModel(models.Model):
game = models.ForeignKey(GameModel, on_delete=CASCADE)
player = models.ForeignKey(User, on_delete=CASCADE)
timestamp = models.IntegerField()

View File

@ -0,0 +1,50 @@
from transcendence.abstract.AbstractRoom import AbstractRoom
from transcendence.abstract.AbstractRoomMember import AbstractRoomMember
from .APlayer import APlayer
from .ASpectator import ASpectator
from ..models import GameModel
from django.contrib.auth.models import User
class AGame(AbstractRoom):
def __init__(self, game_type: str, game_id: int, game_manager):
super().__init__(game_manager)
self.game_manager = game_manager
self.model: GameModel = GameModel.objects.get(pk = game_id, game_type = game_type)
players: list[User] = self.model.get_players()
self.players: list[APlayer] = [APlayer(player.pk, None, self) for player in players]
self.spectators: list[ASpectator] = []
self.game_id: int = game_id
def get_players_id(self) -> list[int]:
return [player.pk for player in self.players]
def get_players_connected(self) -> list[APlayer]:
return [player for player in self.players if player.is_connected()]
def get_player_by_user_id(self, user_id: int) -> APlayer:
for player in self.players:
if (player.user.pk == user_id):
return player
return None
def broadcast(self, detail: str, data: dict = {}, excludeds: list[ASpectator | APlayer] = []):
members: list[APlayer | ASpectator] = self.get_players_connected() + self.spectators
for excluded in excludeds:
if (excluded in members):
members.remove(excluded)
for member in members:
member.send(detail, data)

View File

@ -0,0 +1,19 @@
from __future__ import annotations
from channels.generic.websocket import WebsocketConsumer
from .ASpectator import ASpectator
class APlayer(ASpectator):
def is_connected(self) -> bool:
return self.socket != None
def send_error(self, error_message: str, error_data = {}):
data: dict = {
"error_message": error_message
}
data.update(error_data)
self.send("error", data)

View File

@ -0,0 +1,19 @@
from channels.generic.websocket import WebsocketConsumer
from transcendence.abstract.AbstractRoomMember import AbstractRoomMember
from django.contrib.auth.models import User
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .AGame import AGame
class ASpectator(AbstractRoomMember):
def __init__(self, user: User, socket: WebsocketConsumer, game):
super().__init__(user, socket)
self.game: AGame = game

View File

@ -0,0 +1,34 @@
from ..models import GameModel
from .pong.PongGame import PongGame
from .tictactoe.TicTacToeGame import TicTacToeGame
class GameManager():
def __init__(self) -> None:
self._game_list: list[PongGame | TicTacToeGame] = []
def remove(self, game: PongGame | TicTacToeGame) -> None:
if (game not in self._game_list):
return
self._game_list.remove(game)
def get(self, game_id: int, game_type: str) -> TicTacToeGame | PongGame:
if (not GameModel.objects.filter(pk=game_id, finished=False, game_type=game_type).exists()):
return None
for game in self._game_list:
game: PongGame | TicTacToeGame
if (game.game_id == game_id):
return game
game: PongGame | TicTacToeGame
if (game_type == "pong"):
game = PongGame(game_id, self)
elif (game_type == "tictactoe"):
game = TicTacToeGame(game_id, self)
self._game_list.append(game)
return game

View File

@ -0,0 +1,39 @@
from __future__ import annotations
from ... import config
from .Position import Position
from .Point import Point
import time
import math
class Ball:
def __init__(self) -> None:
self.size: float
self.position: Position
self.angle: float
self.speed: float
self.reset()
self.speed = 0
def reset(self) -> None:
self.size = config.BALL_SIZE
self.position = Position(Point(config.BALL_SPAWN_POS_X + self.size / 2, config.BALL_SPAWN_POS_Y + self.size / 2), time.time())
self.angle = math.pi * 1
self.speed = config.BALL_SPEED_START
def to_dict(self) -> dict:
data: dict = {
"size": self.size,
"speed": self.speed,
"position": self.position.to_dict(),
"angle": self.angle,
}
return data
def __str__(self) -> str:
return f"Ball(size: {self.size}, speed: {self.speed}, angle: {self.angle}, position: {self.position})"

View File

@ -0,0 +1,32 @@
from __future__ import annotations
from math import dist
class Point:
def __init__(self, x: float, y: float) -> None:
self.x = x
self.y = y
def __str__(self) -> str:
return f"Point(x: {self.x}, y: {self.y})"
def __repr__(self) -> str:
return f"Point(x: {self.x}, y: {self.x})"
def __eq__(self, __value: object) -> bool:
return (self.x == __value.x and self.y == __value.y)
def distance(self, point: Point):
return dist((point.x, point.y), (self.x, self.y))
def copy(self):
return Point(self.x, self.y)
def to_dict(self) -> dict:
data: dict[str: float] = {
"x": self.x,
"y": self.y,
}
return data

View File

@ -0,0 +1,195 @@
from ..AGame import AGame
from .PongPlayer import PongPlayer
from .PongSpectator import PongSpectator
from .Wall import Wall
from .Segment import Segment
from .Point import Point
from .Ball import Ball
from ... import config
from channels.generic.websocket import WebsocketConsumer
from django.contrib.auth.models import User
from ...routine import routine
import threading
class PongGame(AGame):
def __init__(self, game_id: int, game_manager):
super().__init__("pong", game_id, game_manager)
self.players: list[PongPlayer]
self.walls: list[Wall]
players: list[User] = self.model.get_players()
nb_players: int = len(players)
if (nb_players == 2):
corners = [Point(50, config.MAP_CENTER_Y - config.MAP_SIZE_Y / 4),
Point(config.MAP_SIZE_X - 50, config.MAP_CENTER_Y - config.MAP_SIZE_Y / 4),
Point(config.MAP_SIZE_X - 50, config.MAP_CENTER_Y + config.MAP_SIZE_Y / 4),
Point(50, config.MAP_CENTER_Y + config.MAP_SIZE_Y / 4)
]
self.players = [PongPlayer(self, players[0], None, Segment(corners[1].copy(), corners[2].copy())),
PongPlayer(self, players[1], None, Segment(corners[0].copy(), corners[3].copy()))]
self.walls = [Wall(corners[0], corners[1]),
Wall(corners[2], corners[3])]
else:
corners: list[Point] = [Point(50, 50),
Point(config.MAP_SIZE_X - 50, 50),
Point(config.MAP_SIZE_X - 50, config.MAP_CENTER_Y - 50),
Point(50, config.MAP_SIZE_Y - 50)]
self.players = []
self.walls = []
for i in range(4):
if i < nb_players:
self.players.append(PongPlayer(self, players[i], None, Segment(corners[i], corners[(i + 1) % 4])))
else:
self.walls.append(Segment(corners[i], corners[(i + 1) % 4]))
self.ball: Ball = Ball()
def goal(self, goal_defenser: PongPlayer) -> None:
timestamp: int = goal_defenser.add_goal()
self.broadcast("goal", {"player_id": goal_defenser.user.pk,
"timestamp": timestamp})
if len(goal_defenser.score) >= config.GAME_MAX_SCORE:
self.eliminate(goal_defenser)
player_list = self.get_valid_players()
if len(player_list) == 1:
self.finish(player_list[0])
return
self.ball.reset()
self.broadcast("update_ball", self.ball.to_dict())
def get_valid_players(self) -> list[PongPlayer]:
return [player for player in self.players if player.is_connected and not player.is_eliminated]
def finish(self, winner: PongPlayer) -> bool:
self.broadcast("finish", {'winner_id': winner.user.pk})
self.model.finish(winner.user)
self.stopped = True
def start(self):
# Set to true to stop the thread routine
self.stopped: bool = False
self.model.start()
self.broadcast("start")
self.ball.reset()
self.broadcast("update_ball", self.ball.to_dict())
self.thread = threading.Thread(target=routine, args=(self,))
self.thread.start()
def eliminate(self, eliminated: PongPlayer):
self.broadcast("eliminated", {"eliminated_id": eliminated.user.pk})
eliminated.eliminate()
def _player_join(self, user: User, socket: WebsocketConsumer) -> PongPlayer | None:
if (self.model.started):
return None
player = self.get_player_by_user_id(user.pk)
if (player is None):
return None
# check if player is already connected
if (player.is_connected()):
player.disconnect(1001)
player.socket = socket
self.update_player(player)
if len(self.players) == len(self.get_players_connected()):
self.start()
return player
def _spectator_join(self, user: User, socket: WebsocketConsumer) -> PongSpectator:
spectator: PongSpectator = PongSpectator(user, socket, self)
self.spectators.append(spectator)
return spectator
def join(self, user: User, socket: WebsocketConsumer) -> PongSpectator | PongPlayer:
member: PongPlayer | PongSpectator
member = self._player_join(user, socket)
if member is None:
member = self._spectator_join(user, socket)
self._send_game_data(member)
return member
def _player_leave(self, player: PongPlayer):
if self.model.started:
self.eliminate(player)
players: list[PongPlayer] = self.get_valid_players()
if len(players) == 1:
self.finish(players[0])
def _spectator_leave(self, spectator: PongSpectator):
self.spectators.remove(spectator)
def leave(self, member: PongSpectator | PongPlayer):
if (isinstance(member, PongPlayer)):
self._player_leave(member)
elif (isinstance(member, PongSpectator)):
self._spectator_leave(member)
if self.model.started:
if len(self.get_players_connected()) + len(self.spectators) == 0:
self.stopped = True
if hasattr(self, 'thread'):
self.thread.join(10)
self.game_manager.remove(self)
def _send_game_data(self, member: PongSpectator | PongPlayer):
member.send("init_game", self.to_dict())
def update_player(self, player: PongPlayer):
self.broadcast("update_player", player.to_dict(), [player])
def to_dict(self):
data: dict = {"ball": self.ball.to_dict(),
"players": [player.to_dict() for player in self.players],
"walls": [wall.to_dict() for wall in self.walls],
}
return data

View File

@ -0,0 +1,132 @@
from __future__ import annotations
from ... import config
from channels.generic.websocket import WebsocketConsumer
from django.contrib.auth.models import User
from .Position import Position
from ..APlayer import APlayer
from .Segment import Segment
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .PongGame import PongGame
class PongPlayer(APlayer):
def __init__(self, game: PongGame, user: User, socket: WebsocketConsumer, rail: Segment) -> None:
super().__init__(user, socket, game)
self.position: Position = Position(0.5, 0)
self.score: list[int] = []
self.rail: Segment = rail
self.is_eliminated: bool = False
self.game: PongGame
def eliminate(self):
self.is_eliminated = True
def receive(self, data: dict):
detail: str = data.get("detail")
if (detail is None):
return
if (detail == "update_my_paddle_pos"):
self.update_position(data)
def update_position(self, data: dict):
new_position: Position = Position(None)
position_dict = data.get("position")
if (position_dict is None):
self.send_error("missing position")
return
new_position.location = position_dict.get("location")
if (new_position.location is None):
self.send_error("missing location")
return
new_position.time = data.get("time")
if (new_position.time is None):
self.send_error("missing time")
return
if (self.position.time > new_position.time):
self.send_error("time error")
return
distance: float = abs(self.position.location - new_position.location)
sign: int = 1 if self.position.location >= new_position.location else -1
time_difference: float = (new_position.time - self.position.time) / 1000
max_distance: float = config.PADDLE_SPEED_PER_SECOND_MAX * (time_difference) * config.PADDLE_SPEED_PER_SECOND_TOLERANCE
new_position_verified: Position = new_position.copy()
if (distance > max_distance):
new_position_verified.location = self.position.location + max_distance * sign
if (not config.PADDLE_POSITION_MIN <= new_position_verified.location <= config.PADDLE_POSITION_MAX):
new_position_verified.location = max(new_position_verified.location, config.PADDLE_POSITION_MIN)
new_position_verified.location = min(new_position_verified.location, config.PADDLE_POSITION_MAX)
invalid_pos: bool = new_position.location != new_position_verified.location
if (new_position.location != self.position.location):
self.game.update_player(self)
self.position.location = new_position.location
if (invalid_pos):
self.send("update_player", self.to_dict())
def connect(self, socket: WebsocketConsumer):
self.socket = socket
self.accept()
self.game.update_player(self)
def disconnect(self, code: int = 1000):
self.socket = None
self.game.leave(self)
def add_goal(self):
timestamp = self.game.model.add_goal(self.user)
self.score.append(timestamp)
return timestamp
def to_dict(self) -> dict:
data = {
"username": self.user.username,
"id": self.user.pk,
"position": self.position.to_dict(),
"score": self.score,
"isEliminated": self.is_eliminated,
"rail": self.rail.to_dict(),
"isConnected": self.is_connected(),
}
return data

View File

@ -0,0 +1,29 @@
from __future__ import annotations
from channels.generic.websocket import WebsocketConsumer
from django.contrib.auth.models import User
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .PongPlayer import PongPlayer
from .PongGame import PongGame
from ..ASpectator import ASpectator
from .Ball import Ball
class PongSpectator(ASpectator):
def __init__(self, user: User, socket: WebsocketConsumer, game: PongGame):
super().__init__(user, socket, game)
self.game: PongGame = game
def send_paddle(self, player: PongPlayer):
self.send("update_player", player.to_dict())
def send_ball(self, ball: Ball):
self.send("update_ball", ball.to_dict())
def disconnect(self, code: int = 1000):
self.game.leave(self)

View File

@ -0,0 +1,30 @@
from __future__ import annotations
from .Point import Point
class Position:
def __init__(self, location: int | Point = 0, time: int = 0) -> None:
self.time: float = time
self.location: float | Point = location
def copy(self):
try:
return Position(self.location.copy(), self.time)
except:
return Position(self.location, self.time)
def to_dict(self):
data: dict = {
"time": self.time,
}
try:
data.update({"location": self.location.to_dict()})
except:
data.update({"location": self.location})
return data
def __eq__(self, __value: Position) -> bool:
return (self.location == __value.location)

View File

@ -0,0 +1,38 @@
from .Point import Point
from .Vector import Vector
import math
class Segment:
def __init__(self, start: Point, stop: Point) -> None:
self.start: Point = start
self.stop: Point = stop
def angle(self) -> float:
return math.atan2((self.start.y - self.stop.y), (self.start.x - self.stop.x))
def length(self):
return self.start.distance(self.stop)
def is_on(self, C: Point):
return (self.start.distance(C) + self.stop.distance(C) == self.length())
def __repr__(self) -> str:
return f"Segment(start: {self.start}, stop: {self.stop})"
def __str__(self) -> str:
return f"Segment(start: {self.start}, stop: {self.stop})"
def copy(self):
return Segment(self.start.copy(), self.stop.copy())
def to_dict(self) -> dict:
data: dict[str: dict] = {
"start": self.start.to_dict(),
"stop": self.stop.to_dict(),
}
return data

View File

@ -0,0 +1,35 @@
from __future__ import annotations
import math
from .Point import Point
class Vector:
def __init__(self, x: float, y: float) -> None:
self.norm: float = math.dist((0, 0), (x, y))
self.x: float = x
self.y: float = y
def __truediv__(self, denominator: float):
return Vector(self.x / denominator, self.y / denominator)
def angle(self, vector: Vector):
scalar_product: float = self.scalar(vector)
if (scalar_product is None):
return None
cos: float = scalar_product / (vector.norm * self.norm)
angle: float = math.acos(cos)
return angle
def scalar(self, vector: Vector):
return self.x * vector.x + vector.y * self.y
def __str__(self) -> str:
return f"Vector(x: {self.x}, y: {self.y}, norme: {self.norm})"
def __eq__(self, __value: Vector) -> bool:
return (self.x == __value.x and
self.x == __value.x and
self.norm == __value.norm)

View File

@ -0,0 +1,5 @@
from .Segment import Segment
class Wall(Segment):
pass

View File

@ -0,0 +1,96 @@
from ..AGame import AGame
from channels.generic.websocket import WebsocketConsumer
from .TicTacToePlayer import TicTacToePlayer
from .TicTacToeSpectator import TicTacToeSpectator
import time
class TicTacToeGame(AGame):
def __init__(self, game_id: int, game_manager):
super().__init__("tictactoe", game_id, game_manager)
players: list[int] = self.model.get_players()
self.players: list[TicTacToePlayer] = [TicTacToePlayer(player, None, self, ["x", "o"][i]) for i, player in enumerate(players)]
self.time = -1
self.turn = 'x'
self._map = [[-1 for _ in range(9)] for _ in range(9)]
self.winner = None
def _everbody_is_here(self):
return len(self.players) == len(self.get_players_connected())
def _player_join(self, user_id: int, socket: WebsocketConsumer):
player = self.get_player_by_user_id(user_id)
if (player is None):
return None
# check if player is already connected
if (player.is_connected()):
player.disconnect(1001)
player.socket = socket
return player
def add(self, newmove, player):
if (self.checkMove(newmove, player) and self.checkWin() == False):
self._map[newmove.get("targetMorpion")][newmove.get("targetCase")] = newmove.get("sign")
player.currentMorpion = int(newmove.get("targetCase"))
self.changeTurn()
self.time = time.time()
return True
return False
def changeTurn(self):
self.turn = 'x' if (self.turn == 'o') else 'o'
def checkMove(self, newmove, player):
if (int(newmove.get("targetMorpion")) != player.currentMorpion or newmove.get("sign") != self.turn):
return False
if (self._map[newmove.get("targetMorpion")][newmove.get("targetCase")] != -1):
return False
return True
def checkWin(self):
for tab in self._map:
for i in range(3):
if tab[i] != -1 and tab[i] == tab[i + 3] and tab[i + 3] == tab[i + 6]:
return
for i in range(0, 9, 3):
if tab[i] != -1 and tab[i] == tab[i + 1] and tab[i + 1] == tab[i + 2]:
return tab[i]
if tab[0] != -1 and tab[0] == tab[4] and tab[4] == tab[8]:
return tab[0]
if tab[6] != -1 and tab[6] == tab[4] and tab[4] == tab[2]:
return tab[6]
return False
def _spectator_join(self, user_id: int, socket: WebsocketConsumer):
spectator:TicTacToeSpectator = TicTacToeSpectator(user_id, socket, self)
self.spectators.append(spectator)
return spectator
def join(self, user_id: int, socket: WebsocketConsumer) -> TicTacToeSpectator | TicTacToePlayer:
member: TicTacToePlayer = self._player_join(user_id, socket)
if (member is None):
member: TicTacToeSpectator = self._spectator_join(user_id, socket)
return member

View File

@ -0,0 +1,14 @@
from games.objects.AGame import AGame
from ..APlayer import APlayer
from django.contrib.auth.models import User
from channels.generic.websocket import WebsocketConsumer
class TicTacToePlayer(APlayer):
def __init__(self, user: User, socket: WebsocketConsumer, game: AGame, sign):
super().__init__(user, socket, game)
self.sign = sign
self.currentMorpion = 4
self.timestamp = None

View File

@ -0,0 +1,4 @@
from ..ASpectator import ASpectator
class TicTacToeSpectator(ASpectator):
pass

303
django/games/routine.py Normal file
View File

@ -0,0 +1,303 @@
from __future__ import annotations
from .objects.pong.PongGame import PongPlayer
from .objects.pong.Segment import Segment
from .objects.pong.Point import Point
from .objects.pong.Ball import Ball
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .objects.pong.PongGame import PongGame
from . import config
import math
from asgiref.sync import SyncToAsync
import asyncio
VERTICALLY = 1
NORMAL = 2
def get_player_hitted(players: list[PongPlayer], segment: Segment) -> PongPlayer | None:
for player in players:
if (player.rail is segment):
return player
return None
def wall_collision(ball_angle: float, wall: Segment) -> float:
wall_angle: float = wall.angle()
cos: float = math.cos(wall_angle) * -1
sin: float = math.sin(wall_angle)
wall_angle: float = math.atan2(sin, cos)
incident_angle: float = ball_angle - wall_angle
reflection_angle: float = wall_angle - incident_angle
return reflection_angle
async def paddle_collision(game: PongGame, impact: Point, player: PongPlayer, inc_x: float, inc_y: float):
diff_x: float = player.rail.stop.x - player.rail.start.x
diff_y: float = player.rail.stop.y - player.rail.start.y
paddle_center_x: float = player.rail.start.x + diff_x * player.position.location
paddle_center_y: float = player.rail.start.y + diff_y * player.position.location
paddle_center: Point = Point(paddle_center_x, paddle_center_y)
rail_length: float = player.rail.length()
paddle_length: float = rail_length * config.PADDLE_RATIO
start_x: float = paddle_center.x - (diff_x * (paddle_length / 2 / rail_length))
start_y: float = paddle_center.y - (diff_y * (paddle_length / 2 / rail_length))
stop_x: float = paddle_center.x + (diff_x * (paddle_length / 2 / rail_length))
stop_y: float = paddle_center.y + (diff_y * (paddle_length / 2 / rail_length))
start: Point = Point(start_x, start_y)
stop: Point = Point(stop_x, stop_y)
paddle: Segment = Segment(start, stop)
hit_point: Point = Point(impact.x - inc_x, impact.y - inc_y)
if not paddle.is_on(hit_point):
await asyncio.sleep(0.1) # delay to create frontend animation
await SyncToAsync(game.goal)(player)
return None
paddle_angle: float = paddle.angle()
normal: float = paddle_angle - math.pi / 2
start_distance: float = paddle.start.distance(impact)
stop_distance: float = paddle.stop.distance(impact)
hit_percent: float = (start_distance) / (start_distance + stop_distance)
hit_percent = round(hit_percent, 1)
new_angle: float = normal + (math.pi * 0.85) * (hit_percent - 0.5)
return new_angle
async def collision(game: PongGame, impact_data: dict) -> bool:
segment: Segment = impact_data.get("segment")
player_hitted = None
for player in game.players:
if (not player.is_connected()):
continue
if (player.rail is segment):
player_hitted = player
break
angle: float
print(impact_data.get("impact"))
if (player_hitted is None):
print("wall")
angle = wall_collision(game.ball.angle, segment)
else:
print("paddle")
angle = await paddle_collision(game, impact_data.get("impact"), player_hitted, impact_data.get("inc_x"), impact_data.get("inc_y"))
if (angle is None):
return False
print((game.ball.angle % (math.pi * 2)) * 180 / math.pi, (angle % (math.pi * 2)) * 180 / math.pi)
game.ball.speed += config.BALL_SPEED_INC
game.ball.angle = angle
return True
async def update_ball(game: PongGame, impact_data: dict):
distance: float = impact_data.get("distance")
time_before_impact: float = distance / game.ball.speed
print(time_before_impact)
await asyncio.sleep(time_before_impact)
hit: bool = await collision(game, impact_data)
print("HIT" * 10 + str(hit))
if hit:
game.ball.position.location = impact_data.get("impact")
SyncToAsync(game.broadcast)("update_ball", game.ball.to_dict())
def get_sign(num: float) -> int:
if (num == 0):
return 0
if (num > 0):
return 1
if (num < 0):
return -1
def get_derive(segment: Segment) -> float:
if (segment.start.x == segment.stop.x):
return None
return (segment.stop.y - segment.start.y) / (segment.stop.x - segment.start.x)
def get_intercept(derive: float, point: Point) -> float:
if (derive is None):
return None
return point.y - (point.x * derive)
def get_constant(segment: Segment) -> float:
return segment.start.x
def identify(segment: Segment) -> str:
if (segment.start.x == segment.stop.x):
return VERTICALLY
return NORMAL
def get_interception(segment1: Segment, segment2: Segment):
if (identify(segment1) == VERTICALLY and identify(segment2) == VERTICALLY):
return None
# because of in matematics world y = 10 is above y = 5 and on a display it is inverted I invert the coordonate
inverted_segment1 = Segment(Point(segment1.start.x, config.MAP_SIZE_Y - segment1.start.y), Point(segment1.stop.x, config.MAP_SIZE_Y - segment1.stop.y))
inverted_segment2 = Segment(Point(segment2.start.x, config.MAP_SIZE_Y - segment2.start.y), Point(segment2.stop.x, config.MAP_SIZE_Y - segment2.stop.y))
if (identify(segment1) == NORMAL and identify(segment2) == NORMAL):
# representation m * x + p
m1 = get_derive(inverted_segment1)
m2 = get_derive(inverted_segment2)
p1 = get_intercept(m1, inverted_segment1.start)
p2 = get_intercept(m2, inverted_segment2.start)
# m1 * x + p1 = m2 * x + p2
# m1 * x = m2 * x + p2 -p1
# m1 * x - m2 * x = p1 - p2
# x * (m1 - m2) = p1 - p2
# x = (p1 - p2) / (m1 - m2)
if (m1 == m2):
return None
# reinvert
x: float = (p1 - p2) / (m1 - m2) * (-1)
y: float = config.MAP_SIZE_Y - (m1 * x + p1)
else:
if (identify(inverted_segment1) == VERTICALLY):
constant: float = get_constant(inverted_segment1)
m: float = get_derive(inverted_segment2)
p: float = get_intercept(m, inverted_segment2.start)
else:
constant: float = get_constant(inverted_segment2)
m: float = get_derive(inverted_segment1)
p: float = get_intercept(m, inverted_segment1.start)
x: float = constant
y: float = config.MAP_SIZE_Y - (m * x + p)
impact: Point = Point(x, y)
return impact
def get_impact_data(segments: list[Segment], ball: Ball) -> dict:
cos: float = round(math.cos(ball.angle), 6)
sin: float = round(math.sin(ball.angle), 6)
inc_x: float = (-1) * get_sign(cos) * (config.STROKE_THICKNESS + config.BALL_SIZE / 2)
inc_y: float = get_sign(sin) * (config.STROKE_THICKNESS + config.BALL_SIZE / 2)
point: Point = Point(ball.position.location.x + cos, ball.position.location.y - sin)
ball_segment = Segment(ball.position.location, point)
closest: dict = None
for segment in segments:
segment_with_padding = segment.copy()
segment_with_padding.start.x += inc_x
segment_with_padding.stop.x += inc_x
segment_with_padding.start.y += inc_y
segment_with_padding.stop.y += inc_y
impact: Point = get_interception(segment_with_padding, ball_segment)
if (impact is None):
continue
diff_x: float = ball.position.location.x - impact.x
if (get_sign(diff_x) == get_sign(cos) and cos != 0):
continue
diff_y: float = (ball.position.location.y - impact.y)
if (get_sign(diff_y) != get_sign(sin) and sin != 0):
continue
distance: float = impact.distance(ball.position.location)
if (closest is None or distance < closest.get("distance")):
closest = {
"inc_x": inc_x,
"inc_y": inc_y,
"impact": impact,
"segment": segment,
"distance": distance,
}
return closest
async def render_ball(game: PongGame):
segments: list[Segment] = [player.rail for player in game.players] + game.walls
while True:
impact_data: dict = get_impact_data(segments, game.ball)
await update_ball(game, impact_data)
async def async_routine(game: PongGame):
#TODO DEBUG collision
ball_routine = asyncio.create_task(render_ball(game))
while True:
if game.stopped:
ball_routine.cancel()
return
await asyncio.sleep(0.05)
def routine(game: PongGame):
asyncio.run(async_routine(game))

7
django/games/routing.py Normal file
View File

@ -0,0 +1,7 @@
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/games/pong/(?P<game_id>\d+)$', consumers.PongWebSocket.as_asgi()),
re_path(r'ws/games/tictactoe/(?P<game_id>\d+)$', consumers.TicTacToeWebSocket.as_asgi()),
]

View File

@ -0,0 +1,37 @@
from rest_framework import serializers
from django.contrib.auth.models import User
from django.db.models import QuerySet
from .models import GameModel
from profiles.serializers import ProfileSerializer
class GameSerializer(serializers.ModelSerializer):
players = serializers.SerializerMethodField()
winner = serializers.SerializerMethodField()
state = serializers.SerializerMethodField()
started = serializers.ReadOnlyField()
finished = serializers.ReadOnlyField()
start_timestamp = serializers.ReadOnlyField()
stop_timestamp = serializers.ReadOnlyField()
game_type = serializers.ReadOnlyField()
class Meta:
model = GameModel
fields = ["id", "winner", "state", "started", "finished", "players", "start_timestamp", "stop_timestamp", "game_type"]
def get_state(self, instance: GameModel):
if (instance.finished):
return "finished"
if (instance.started):
return "started"
return "waiting"
def get_winner(self, instance: GameModel):
if (instance.winner is None):
return None
return ProfileSerializer(instance.winner.profilemodel).data
def get_players(self, instance: GameModel):
return ProfileSerializer(instance.get_players_profiles(), many=True).data

3
django/games/tests.py Normal file
View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

13
django/games/urls.py Normal file
View File

@ -0,0 +1,13 @@
from django.urls import path, re_path
from django.conf import settings
from django.conf.urls.static import static
from .GameViewSet import GameViewSet
from .GameHistoryViewSet import GameHistoryView
from .GameConfigView import GameConfigView
urlpatterns = [
path("<int:pk>", GameViewSet.as_view({"get": "retrieve"}), name="game_page"),
path("history/<int:pk>", GameHistoryView.as_view({"get": "retrive"}), name="history_page"),
path("", GameConfigView.as_view(), name = "game_config")
]