From 8757ad1c12559b09e10c746bbc202e8f84a4160d Mon Sep 17 00:00:00 2001 From: Steven Date: Thu, 13 Jun 2024 22:54:25 -0700 Subject: [PATCH] Do more code formatting --- src/environment.py | 796 +++++++++++++++-------------- src/evaluator.py | 1136 ++++++++++++++++++++++------------------- src/fast_evaluator.py | 35 +- src/holdem.py | 887 ++++++++++++++++---------------- src/rps.py | 119 ++--- src/table.py | 17 +- src/treys_test.py | 4 +- src/utils.py | 490 +++++++++--------- 8 files changed, 1809 insertions(+), 1675 deletions(-) diff --git a/src/environment.py b/src/environment.py index b761d7d..4317150 100644 --- a/src/environment.py +++ b/src/environment.py @@ -1,97 +1,107 @@ # The Poker Environment from evaluator import * from typing import List -from holdem import HoldEmHistory, HoldemInfoSet # To get the legal actions - -class Player(): # This is the POV - def __init__(self, balance) -> None: - self.is_AI = False - - self.hand: List[Card] = [] # The hand is also known as hole cards: https://en.wikipedia.org/wiki/Texas_hold_%27em - self.player_balance: int = balance # TODO: Important that this value cannot be modified easily... - self.current_bet = 0 - self.playing_current_round = True - - # Wellformedness, hand is always either 0 or 2 cards - def add_card_to_hand(self, card: Card): - self.hand.append(card) - assert(len(self.hand) <= 2) - - def clear_hand(self): - self.hand = [] - - def place_bet(self, action: str, observed_env) -> int: - hist : HoldEmHistory = observed_env.history - legal_actions = hist.actions() - if action not in legal_actions: - raise Exception(f"Invalid Action: {action}") - - current_game_stage_history, stage = hist.get_current_game_stage_history() - - if action == 'k': # check - if stage == 'preflop': - self.current_bet = 2 # BB - else: - self.current_bet = 0 - - elif action == 'c': - # If you call on the preflop - if len(hist.history) == 2: - self.current_bet = 2 - else: - self.current_bet = int(hist.history[-1][1:]) - - elif action[0] == 'b': # bet X amount - self.current_bet = int(action[1:]) - return action - - def calculate_pot_odds(self): # Calculate Pot Odds helper function, basically look at how many hands can you currently beat - """ - Simple logic, does not account for the pot values. - """ +from holdem import HoldEmHistory, HoldemInfoSet # To get the legal actions + + +class Player: # This is the POV + def __init__(self, balance) -> None: + self.is_AI = False + + self.hand: List[Card] = ( + [] + ) # The hand is also known as hole cards: https://en.wikipedia.org/wiki/Texas_hold_%27em + self.player_balance: int = ( + balance # TODO: Important that this value cannot be modified easily... + ) + self.current_bet = 0 + self.playing_current_round = True + + # Wellformedness, hand is always either 0 or 2 cards + def add_card_to_hand(self, card: Card): + self.hand.append(card) + assert len(self.hand) <= 2 + + def clear_hand(self): + self.hand = [] + + def place_bet(self, action: str, observed_env) -> int: + hist: HoldEmHistory = observed_env.history + legal_actions = hist.actions() + if action not in legal_actions: + raise Exception(f"Invalid Action: {action}") + + current_game_stage_history, stage = hist.get_current_game_stage_history() + + if action == "k": # check + if stage == "preflop": + self.current_bet = 2 # BB + else: + self.current_bet = 0 + + elif action == "c": + # If you call on the preflop + if len(hist.history) == 2: + self.current_bet = 2 + else: + self.current_bet = int(hist.history[-1][1:]) + + elif action[0] == "b": # bet X amount + self.current_bet = int(action[1:]) + return action + + def calculate_pot_odds( + self, + ): # Calculate Pot Odds helper function, basically look at how many hands can you currently beat + """ + Simple logic, does not account for the pot values. + """ + class AIPlayer(Player): - def __init__(self, balance) -> None: - super().__init__(balance) - self.is_AI = True - - # We are going to have the dumbest AI possible, which is to call every time - def place_bet(self, observed_env) -> int: # AI will call every time - # Very similar function to Player.place_bet, we only call and check - action = 'k' - hist : HoldEmHistory = observed_env.history - legal_actions = hist.actions() - if action not in legal_actions: - action = 'c' - - if action not in legal_actions: - raise Exception('AI found no legal actions', hist.actions()) - - current_game_stage_history, stage = hist.get_current_game_stage_history() - if action == 'k': # check - if stage == 'preflop': - self.current_bet = 2 - else: - self.current_bet = 0 - - elif action == 'c': - # If you call on the preflop - if len(hist.history) == 2: - self.current_bet = observed_env.big_blind - else: # Set the current bet to the amount of the last bet - self.current_bet = int(hist.history[-1][1:]) - - return action - -class PokerEnvironment(): - """ - Also see the HoldEmHistory class in holdem.py, which defines the set of legal actions every time - """ - def __init__(self) -> None: - self.players: List[Player] = [] - self.deck = Deck() - - """Game Stages: + def __init__(self, balance) -> None: + super().__init__(balance) + self.is_AI = True + + # We are going to have the dumbest AI possible, which is to call every time + def place_bet(self, observed_env) -> int: # AI will call every time + # Very similar function to Player.place_bet, we only call and check + action = "k" + hist: HoldEmHistory = observed_env.history + legal_actions = hist.actions() + if action not in legal_actions: + action = "c" + + if action not in legal_actions: + raise Exception("AI found no legal actions", hist.actions()) + + current_game_stage_history, stage = hist.get_current_game_stage_history() + if action == "k": # check + if stage == "preflop": + self.current_bet = 2 + else: + self.current_bet = 0 + + elif action == "c": + # If you call on the preflop + if len(hist.history) == 2: + self.current_bet = observed_env.big_blind + else: # Set the current bet to the amount of the last bet + self.current_bet = int(hist.history[-1][1:]) + + return action + + +class PokerEnvironment: + """ + Also see the HoldEmHistory class in holdem.py, which defines the set of legal actions every time + """ + + def __init__(self) -> None: + self.players: List[Player] = [] + self.deck = Deck() + + """Game Stages: 1: Starting a new round, giving players their cards. Automatically goes into state 2 2: Preflop betting round. Goes into state 3 once everyone has made their decision 3: Flop round. Goes into turn (state 4) /ends round (state 6) once everyone " " @@ -101,299 +111,315 @@ def __init__(self) -> None: Game Stage - 2 = number of "/" in the holdem infoset and history """ - self.game_stage = 1 # To keep track of which phase of the game we are at, new_round is 0 -# If self.finished_playing_game_stage = True, we can move to the next game state. This is needed to go around each player and await their decision - self.finished_playing_game_stage = False - - # Changes every round - self.dealer_button_position = 0 # This button will move every round - self.total_pot_balance = 0 # keep track of pot size of total round - self.stage_pot_balance = 0 # keep track of pot size for current round - self.community_cards: List[Card] = [] # a.k.a. the board - self.position_in_play = 0 - - self.first_player_to_place_highest_bet = 0 # This is to keep track of who is the first player to have placed the highest bet, so we know when to end the round - - # These values should rarely change. TODO: Figure out how to integrate with holdem.py - self.new_player_balance = 100 - self.small_blind = 1 - self.big_blind = 2 - self.history: HoldEmHistory = HoldEmHistory() # THis will be the history that will be fed into the AI - - self.players_balance_history = [] # List of "n" list for "n" players - - def add_player(self): - self.players.append(Player(self.new_player_balance)) - - def get_player(self, idx) -> Player: - return self.players[idx] - - def add_AI_player(self): # Add a dumb AI - self.players.append(AIPlayer(self.new_player_balance)) - - def get_winning_players(self) -> List: - # If there is more than one winning player, the pot is split. We assume that we only run things once - winning_players: List = [] - for player in self.players: - if player.playing_current_round: - winning_players.append(player) - - return winning_players - - def get_winning_players_idx(self) -> List: - # If there is more than one winning player, the pot is split. We assume that we only run things once - winning_players: List = [] - for idx, player in enumerate(self.players): - if player.playing_current_round: - winning_players.append(idx) - - return winning_players - - def distribute_pot_to_winning_players(self): # Run when self.game_stage = 5 - winning_players = self.get_winning_players() - - pot_winning = self.total_pot_balance / len(winning_players) - for player in winning_players: - player.player_balance += pot_winning - - # Used for graphing later - for idx, player in enumerate(self.players): - # TODO: To be changed if you want to keep the balance history until the very end - try: - self.players_balance_history[idx].append(int(player.player_balance - self.new_player_balance)) - except: - self.players_balance_history.append([]) - self.players_balance_history[idx].append(int(player.player_balance - self.new_player_balance)) - - self.total_pot_balance = 0 # Reset the pot just to be safe - self.stage_pot_balance = 0 # Reset the pot just to be safe - - def count_remaining_players_in_round(self): - # Helper function to count the total number of players still in the round - total = 0 - for player in self.players: - if player.playing_current_round: - total += 1 - return total - - def print_board(self): - for card in self.community_cards: - card.print() - - def start_new_round(self): - self.showdown = False - assert(len(self.players) >= 2) # We cannot start a poker round with less than 2 players... - - # Reset Players - for player in self.players: - player.playing_current_round = True - player.current_bet = 0 - player.clear_hand() - # TODO: Remove this when you are ready - player.player_balance = self.new_player_balance - - # Reset Deck (shuffles it as well), reset pot size - self.deck.reset_deck() - self.community_cards = [] - self.stage_pot_balance = 0 - self.total_pot_balance = 0 - - self.history = HoldEmHistory() # Reset the history - - - # Move the dealer position and assign the new small and big blinds - self.dealer_button_position += 1 - self.dealer_button_position %= len(self.players) - - # Big Blind - self.players[((self.dealer_button_position + 1) % len(self.players))].current_bet = self.big_blind - - # Small Blind - self.players[((self.dealer_button_position + 2) % len(self.players))].current_bet = self.small_blind - - self.update_stage_pot_balance() - # 3. Deal Cards - # We start dealing with the player directly clockwise of the dealer button - position_to_deal = self.dealer_button_position + 1 - - for _ in range(len(self.players)): - position_to_deal %= len(self.players) - card_str = '' - for _ in range(2): - card = self.deck.draw() - card_str += str(card) - self.players[position_to_deal].add_card_to_hand(card) - - - self.history += card_str - position_to_deal += 1 - - self.finished_playing_game_stage = True - - def update_stage_pot_balance(self): - """ - Assumes the balances from the players are correct - - """ - self.stage_pot_balance = 0 - for player in self.players: - self.stage_pot_balance += player.current_bet - - def play_current_stage(self, action: str = ""): - self.update_stage_pot_balance() - if self.players[self.position_in_play].is_AI: - action = self.players[self.position_in_play].place_bet(self) # Pass the Environment as an argument - self.history += action - - else: # Real player's turn - if action == "": # No decision has yet been made - return - else: - self.players[self.position_in_play].place_bet(action, self) - # Update the history - self.history += action - - if (action[0] == 'b'): - self.first_player_to_place_highest_bet = self.position_in_play - - elif action == "f": - self.players[self.position_in_play].playing_current_round = False # Player has folded - self.update_stage_pot_balance() - - - if self.count_remaining_players_in_round() == 1: # Round is over, distribute winnings - self.finished_playing_game_stage = True - self.game_stage = 6 - return - else: - self.move_to_next_player() - - if self.position_in_play == self.first_player_to_place_highest_bet: # Stage is over, move to the next stage (see flop) - self.finished_playing_game_stage = True - - def move_to_next_player(self): - assert(self.count_remaining_players_in_round() > 1) - self.position_in_play += 1 - self.position_in_play %= len(self.players) - - while (not self.players[self.position_in_play].playing_current_round): - self.position_in_play += 1 - self.position_in_play %= len(self.players) - - def play_preflop(self): - """ - About the small blind position: - The "small blind" is placed by the player to the left of the dealer button and the "big blind" is then posted by the next player to the left. - The one exception is when there are only two players (a "heads-up" game), when the player on the button is the small blind, and the other player is the big blind. - """ - if len(self.players) == 2: - self.position_in_play = self.dealer_button_position - else: - self.position_in_play = (self.dealer_button_position + 3) % len(self.players) - self.first_player_to_place_highest_bet = self.position_in_play - - self.finished_playing_game_stage = False - - def play_flop(self): - # 3. Flop - self.history += '/' - - self.deck.draw() # We must first burn one card, TODO: Show on video - - cards = '' - for _ in range(3): # Draw 3 cards - self.community_cards.append(self.deck.draw()) - cards += str(self.community_cards[-1]) - - self.history += cards - - # The person that should play is the first person after the dealer position - self.position_in_play = self.dealer_button_position - self.move_to_next_player() - self.first_player_to_place_highest_bet = self.position_in_play - - self.finished_playing_game_stage = False - - def play_turn(self): - # 4. Turn - self.history += '/' - - self.deck.draw() # We must first burn one card, TODO: Show on video - self.community_cards.append(self.deck.draw()) - - self.history += str(self.community_cards[-1]) - - # The person that should play is the first person after the dealer position - self.position_in_play = self.dealer_button_position - self.move_to_next_player() - self.first_player_to_place_highest_bet = self.position_in_play - - self.finished_playing_game_stage = False - - def play_river(self): - # 5. River - self.history += '/' - - self.deck.draw() # We must first burn one card, TODO: Show on video - self.community_cards.append(self.deck.draw()) - - self.history += str(self.community_cards[-1]) - - self.finished_playing_game_stage = False - - def update_player_balances_at_end_of_stage(self): - for player in self.players: - player.player_balance -= player.current_bet - player.current_bet = 0 - - def move_stage_to_total_pot_balance(self): - self.total_pot_balance += self.stage_pot_balance - self.stage_pot_balance = 0 - - def handle_game_stage(self, action=""): - if self.finished_playing_game_stage: - if self.game_stage != 1: - self.update_player_balances_at_end_of_stage() - self.move_stage_to_total_pot_balance() - self.game_stage += 1 - - if self.game_stage == 2: - self.play_preflop() - elif self.game_stage == 3: - self.play_flop() - elif self.game_stage == 4: - self.play_turn() - elif self.game_stage == 5: - self.play_river() - else: - if self.game_stage == 6: # We reached the river, and are now in the showdown. We need the evaluator to get the winners, set all losers to playing_current_round false - self.showdown = True - evaluator = Evaluator() - - indices_of_potential_winners = [] - for idx, player in enumerate(self.players): - if player.playing_current_round: - indices_of_potential_winners.append(idx) - hand = CombinedHand(self.community_cards + player.hand) - evaluator.add_hands(hand) - - winners = evaluator.get_winner() - for player in self.players: - player.playing_current_round = False - - for winner in winners: - self.players[indices_of_potential_winners[winner]].playing_current_round = True - - self.game_stage = 1 - self.finished_playing_game_stage = False # on the next call of the handler, we will start a new round - else: - if self.game_stage == 1: - # This function was put here instead of at game_stage == 6 to visualize the game - self.distribute_pot_to_winning_players() - self.start_new_round() - else: - self.play_current_stage(action) - - def end_of_round(self): - return self.game_stage == 1 and self.finished_playing_game_stage == False - \ No newline at end of file + self.game_stage = 1 # To keep track of which phase of the game we are at, new_round is 0 + # If self.finished_playing_game_stage = True, we can move to the next game state. This is needed to go around each player and await their decision + self.finished_playing_game_stage = False + + # Changes every round + self.dealer_button_position = 0 # This button will move every round + self.total_pot_balance = 0 # keep track of pot size of total round + self.stage_pot_balance = 0 # keep track of pot size for current round + self.community_cards: List[Card] = [] # a.k.a. the board + self.position_in_play = 0 + + self.first_player_to_place_highest_bet = 0 # This is to keep track of who is the first player to have placed the highest bet, so we know when to end the round + + # These values should rarely change. TODO: Figure out how to integrate with holdem.py + self.new_player_balance = 100 + self.small_blind = 1 + self.big_blind = 2 + self.history: HoldEmHistory = ( + HoldEmHistory() + ) # THis will be the history that will be fed into the AI + + self.players_balance_history = [] # List of "n" list for "n" players + + def add_player(self): + self.players.append(Player(self.new_player_balance)) + + def get_player(self, idx) -> Player: + return self.players[idx] + + def add_AI_player(self): # Add a dumb AI + self.players.append(AIPlayer(self.new_player_balance)) + + def get_winning_players(self) -> List: + # If there is more than one winning player, the pot is split. We assume that we only run things once + winning_players: List = [] + for player in self.players: + if player.playing_current_round: + winning_players.append(player) + + return winning_players + + def get_winning_players_idx(self) -> List: + # If there is more than one winning player, the pot is split. We assume that we only run things once + winning_players: List = [] + for idx, player in enumerate(self.players): + if player.playing_current_round: + winning_players.append(idx) + + return winning_players + + def distribute_pot_to_winning_players(self): # Run when self.game_stage = 5 + winning_players = self.get_winning_players() + + pot_winning = self.total_pot_balance / len(winning_players) + for player in winning_players: + player.player_balance += pot_winning + + # Used for graphing later + for idx, player in enumerate(self.players): + # TODO: To be changed if you want to keep the balance history until the very end + try: + self.players_balance_history[idx].append( + int(player.player_balance - self.new_player_balance) + ) + except: + self.players_balance_history.append([]) + self.players_balance_history[idx].append( + int(player.player_balance - self.new_player_balance) + ) + + self.total_pot_balance = 0 # Reset the pot just to be safe + self.stage_pot_balance = 0 # Reset the pot just to be safe + + def count_remaining_players_in_round(self): + # Helper function to count the total number of players still in the round + total = 0 + for player in self.players: + if player.playing_current_round: + total += 1 + return total + + def print_board(self): + for card in self.community_cards: + card.print() + + def start_new_round(self): + self.showdown = False + assert len(self.players) >= 2 # We cannot start a poker round with less than 2 players... + + # Reset Players + for player in self.players: + player.playing_current_round = True + player.current_bet = 0 + player.clear_hand() + # TODO: Remove this when you are ready + player.player_balance = self.new_player_balance + + # Reset Deck (shuffles it as well), reset pot size + self.deck.reset_deck() + self.community_cards = [] + self.stage_pot_balance = 0 + self.total_pot_balance = 0 + + self.history = HoldEmHistory() # Reset the history + + # Move the dealer position and assign the new small and big blinds + self.dealer_button_position += 1 + self.dealer_button_position %= len(self.players) + + # Big Blind + self.players[((self.dealer_button_position + 1) % len(self.players))].current_bet = ( + self.big_blind + ) + + # Small Blind + self.players[((self.dealer_button_position + 2) % len(self.players))].current_bet = ( + self.small_blind + ) + + self.update_stage_pot_balance() + # 3. Deal Cards + # We start dealing with the player directly clockwise of the dealer button + position_to_deal = self.dealer_button_position + 1 + + for _ in range(len(self.players)): + position_to_deal %= len(self.players) + card_str = "" + for _ in range(2): + card = self.deck.draw() + card_str += str(card) + self.players[position_to_deal].add_card_to_hand(card) + + self.history += card_str + position_to_deal += 1 + + self.finished_playing_game_stage = True + + def update_stage_pot_balance(self): + """ + Assumes the balances from the players are correct + + """ + self.stage_pot_balance = 0 + for player in self.players: + self.stage_pot_balance += player.current_bet + + def play_current_stage(self, action: str = ""): + self.update_stage_pot_balance() + if self.players[self.position_in_play].is_AI: + action = self.players[self.position_in_play].place_bet( + self + ) # Pass the Environment as an argument + self.history += action + + else: # Real player's turn + if action == "": # No decision has yet been made + return + else: + self.players[self.position_in_play].place_bet(action, self) + # Update the history + self.history += action + + if action[0] == "b": + self.first_player_to_place_highest_bet = self.position_in_play + + elif action == "f": + self.players[self.position_in_play].playing_current_round = False # Player has folded + self.update_stage_pot_balance() + + if self.count_remaining_players_in_round() == 1: # Round is over, distribute winnings + self.finished_playing_game_stage = True + self.game_stage = 6 + return + else: + self.move_to_next_player() + + if ( + self.position_in_play == self.first_player_to_place_highest_bet + ): # Stage is over, move to the next stage (see flop) + self.finished_playing_game_stage = True + + def move_to_next_player(self): + assert self.count_remaining_players_in_round() > 1 + self.position_in_play += 1 + self.position_in_play %= len(self.players) + + while not self.players[self.position_in_play].playing_current_round: + self.position_in_play += 1 + self.position_in_play %= len(self.players) + + def play_preflop(self): + """ + About the small blind position: + The "small blind" is placed by the player to the left of the dealer button and the "big blind" is then posted by the next player to the left. + The one exception is when there are only two players (a "heads-up" game), when the player on the button is the small blind, and the other player is the big blind. + """ + if len(self.players) == 2: + self.position_in_play = self.dealer_button_position + else: + self.position_in_play = (self.dealer_button_position + 3) % len(self.players) + self.first_player_to_place_highest_bet = self.position_in_play + + self.finished_playing_game_stage = False + + def play_flop(self): + # 3. Flop + self.history += "/" + + self.deck.draw() # We must first burn one card, TODO: Show on video + + cards = "" + for _ in range(3): # Draw 3 cards + self.community_cards.append(self.deck.draw()) + cards += str(self.community_cards[-1]) + + self.history += cards + + # The person that should play is the first person after the dealer position + self.position_in_play = self.dealer_button_position + self.move_to_next_player() + self.first_player_to_place_highest_bet = self.position_in_play + + self.finished_playing_game_stage = False + + def play_turn(self): + # 4. Turn + self.history += "/" + + self.deck.draw() # We must first burn one card, TODO: Show on video + self.community_cards.append(self.deck.draw()) + + self.history += str(self.community_cards[-1]) + + # The person that should play is the first person after the dealer position + self.position_in_play = self.dealer_button_position + self.move_to_next_player() + self.first_player_to_place_highest_bet = self.position_in_play + + self.finished_playing_game_stage = False + + def play_river(self): + # 5. River + self.history += "/" + + self.deck.draw() # We must first burn one card, TODO: Show on video + self.community_cards.append(self.deck.draw()) + + self.history += str(self.community_cards[-1]) + + self.finished_playing_game_stage = False + + def update_player_balances_at_end_of_stage(self): + for player in self.players: + player.player_balance -= player.current_bet + player.current_bet = 0 + + def move_stage_to_total_pot_balance(self): + self.total_pot_balance += self.stage_pot_balance + self.stage_pot_balance = 0 + + def handle_game_stage(self, action=""): + if self.finished_playing_game_stage: + if self.game_stage != 1: + self.update_player_balances_at_end_of_stage() + self.move_stage_to_total_pot_balance() + self.game_stage += 1 + + if self.game_stage == 2: + self.play_preflop() + elif self.game_stage == 3: + self.play_flop() + elif self.game_stage == 4: + self.play_turn() + elif self.game_stage == 5: + self.play_river() + else: + if ( + self.game_stage == 6 + ): # We reached the river, and are now in the showdown. We need the evaluator to get the winners, set all losers to playing_current_round false + self.showdown = True + evaluator = Evaluator() + + indices_of_potential_winners = [] + for idx, player in enumerate(self.players): + if player.playing_current_round: + indices_of_potential_winners.append(idx) + hand = CombinedHand(self.community_cards + player.hand) + evaluator.add_hands(hand) + + winners = evaluator.get_winner() + for player in self.players: + player.playing_current_round = False + + for winner in winners: + self.players[indices_of_potential_winners[winner]].playing_current_round = ( + True + ) + + self.game_stage = 1 + self.finished_playing_game_stage = ( + False # on the next call of the handler, we will start a new round + ) + else: + if self.game_stage == 1: + # This function was put here instead of at game_stage == 6 to visualize the game + self.distribute_pot_to_winning_players() + self.start_new_round() + else: + self.play_current_stage(action) + + def end_of_round(self): + return self.game_stage == 1 and self.finished_playing_game_stage == False diff --git a/src/evaluator.py b/src/evaluator.py index 22640f2..e0d7608 100644 --- a/src/evaluator.py +++ b/src/evaluator.py @@ -33,13 +33,13 @@ BIT_POSITION_TABLE = generate_table() -CARD_SUITS = ["Clubs", "Diamonds", "Hearts","Spades"] -CARD_SUITS_DICT = {"Clubs": 0, "Diamonds": 1, "Hearts": 2,"Spades": 3} +CARD_SUITS = ["Clubs", "Diamonds", "Hearts", "Spades"] +CARD_SUITS_DICT = {"Clubs": 0, "Diamonds": 1, "Hearts": 2, "Spades": 3} -BIT_MASK_1 = int('0x11111111111111', 16) # 0x111...1 -BIT_MASK_2 = int('0x22222222222222', 16) # 0x222...2 -BIT_MASK_4 = int('0x44444444444444', 16) # 0x444...4 -BIT_MASK_8 = int('0x88888888888888', 16) # 0x888...8 +BIT_MASK_1 = int("0x11111111111111", 16) # 0x111...1 +BIT_MASK_2 = int("0x22222222222222", 16) # 0x222...2 +BIT_MASK_4 = int("0x44444444444444", 16) # 0x444...4 +BIT_MASK_8 = int("0x88888888888888", 16) # 0x888...8 BIT_MASKS = [BIT_MASK_1, BIT_MASK_2, BIT_MASK_4, BIT_MASK_8] """ For CARDS_BIT_SUITS_DICT, we have @@ -50,540 +50,610 @@ """ CARD_BIT_SUITS_DICT = {1: "Clubs", 2: "Diamonds", 4: "Hearts", 8: "Spades"} -CARD_RANKS = [i for i in range(2, 15)] # Jack = 11, Queen = 12, King = 13, IMPORTANT: Ace = 14 since we use that for sorting -CARD_SUITS = ["Clubs", "Diamonds", "Hearts","Spades"] - -RANK_KEY = {"A": 14, "2": 2, "3": 3, "4":4, "5":5, "6":6, # Supports both "T" and "10" as 10 - "7": 7, "8": 8, "9": 9, "T": 10, "10":10, "J": 11, "Q": 12, "K":13} +CARD_RANKS = [ + i for i in range(2, 15) +] # Jack = 11, Queen = 12, King = 13, IMPORTANT: Ace = 14 since we use that for sorting +CARD_SUITS = ["Clubs", "Diamonds", "Hearts", "Spades"] + +RANK_KEY = { + "A": 14, + "2": 2, + "3": 3, + "4": 4, + "5": 5, + "6": 6, # Supports both "T" and "10" as 10 + "7": 7, + "8": 8, + "9": 9, + "T": 10, + "10": 10, + "J": 11, + "Q": 12, + "K": 13, +} # INVERSE_RANK_KEY = {14: "A", 2: "02", 3: "03", 4:"04", 5:"05", 6:"06", # 7:"07", 8:"08", 9:"09", 10:"10", 11: "J", 12: "Q", 13: "K"} -INVERSE_RANK_KEY = {14: "A", 2: "2", 3: "3", 4:"4", 5:"5", 6:"6", - 7:"7", 8:"8", 9:"9", 10:"T", 11: "J", 12: "Q", 13: "K"} - -SUIT_KEY = {"c": "Clubs", "d": "Diamonds", "h":"Hearts","s": "Spades"} - - -class Card(): - """ - You can initialize cards two ways: - - (RECOMMENDED) Card("Ah") - - Card(10, "Spades") - - """ - # Immutable after it has been initialized - def __init__(self,rank_suit=None, rank=14, suit="Spades", generate_random=False) -> None: - - if rank_suit: # Ex: "KD" (King of diamonds), "10H" (10 of Hearts), - self.__rank = RANK_KEY[rank_suit[:-1]] - self.__suit = SUIT_KEY[rank_suit[-1].lower()] - - else: - self.__rank = rank - assert(self.__rank >= 2 and self.__rank <= 14) - self.__suit = suit - - if generate_random: # If we want to just generate a random card - self.__rank = random.choice(CARD_RANKS) - self.__suit = random.choice(CARD_SUITS) - - # Check validity of card TODO: Maybe put into separate function to check wellformedness - if self.__rank not in CARD_RANKS: - raise Exception("Invalid Rank: {}".format(self.__rank)) - if self.__suit not in CARD_SUITS: - raise Exception("Invalid Suit: {}".format(self.__suit)) - - @property - def rank(self): - return self.__rank - - @property - def suit(self): - return self.__suit - - @property - def idx(self): - """ - [AC, AD, AH, AS, 2C, 2D, ... KH, KS] - 0 . 1 . 2 . 3 . 4 . 5 . 50, 51 - """ - rank = self.__rank - if self.__rank == 14: # for the aces - rank = 1 - rank -= 1 - return rank*4 + CARD_SUITS_DICT[self.__suit] - - def __str__(self): # Following the Treys format of printing - return INVERSE_RANK_KEY[self.rank] + self.suit[0].lower() - - -class Deck(): - def __init__(self) -> None: # Create a new full deck - self.__cards: List[Card] = [] - self.reset_deck() - - def shuffle(self): - random.shuffle(self.__cards) - - def reset_deck(self): - self.__cards = [] - for rank in CARD_RANKS: - for suit in CARD_SUITS: - self.__cards.append(Card(rank=rank, suit=suit)) - - random.shuffle(self.__cards) - - @property - def total_remaining_cards(self): - return len(self.__cards) - - def draw(self): # Draw a card from the current deck - card = self.__cards.pop() - return card - -ACTIONS = ["Fold", "Call", "Raise"] +INVERSE_RANK_KEY = { + 14: "A", + 2: "2", + 3: "3", + 4: "4", + 5: "5", + 6: "6", + 7: "7", + 8: "8", + 9: "9", + 10: "T", + 11: "J", + 12: "Q", + 13: "K", +} + +SUIT_KEY = {"c": "Clubs", "d": "Diamonds", "h": "Hearts", "s": "Spades"} + + +class Card: + """ + You can initialize cards two ways: + - (RECOMMENDED) Card("Ah") + - Card(10, "Spades") + + """ + + # Immutable after it has been initialized + def __init__(self, rank_suit=None, rank=14, suit="Spades", generate_random=False) -> None: + + if rank_suit: # Ex: "KD" (King of diamonds), "10H" (10 of Hearts), + self.__rank = RANK_KEY[rank_suit[:-1]] + self.__suit = SUIT_KEY[rank_suit[-1].lower()] + + else: + self.__rank = rank + assert self.__rank >= 2 and self.__rank <= 14 + self.__suit = suit + + if generate_random: # If we want to just generate a random card + self.__rank = random.choice(CARD_RANKS) + self.__suit = random.choice(CARD_SUITS) + + # Check validity of card TODO: Maybe put into separate function to check wellformedness + if self.__rank not in CARD_RANKS: + raise Exception("Invalid Rank: {}".format(self.__rank)) + if self.__suit not in CARD_SUITS: + raise Exception("Invalid Suit: {}".format(self.__suit)) + + @property + def rank(self): + return self.__rank + + @property + def suit(self): + return self.__suit + + @property + def idx(self): + """ + [AC, AD, AH, AS, 2C, 2D, ... KH, KS] + 0 . 1 . 2 . 3 . 4 . 5 . 50, 51 + """ + rank = self.__rank + if self.__rank == 14: # for the aces + rank = 1 + rank -= 1 + return rank * 4 + CARD_SUITS_DICT[self.__suit] + + def __str__(self): # Following the Treys format of printing + return INVERSE_RANK_KEY[self.rank] + self.suit[0].lower() + + +class Deck: + def __init__(self) -> None: # Create a new full deck + self.__cards: List[Card] = [] + self.reset_deck() + + def shuffle(self): + random.shuffle(self.__cards) + + def reset_deck(self): + self.__cards = [] + for rank in CARD_RANKS: + for suit in CARD_SUITS: + self.__cards.append(Card(rank=rank, suit=suit)) + + random.shuffle(self.__cards) + + @property + def total_remaining_cards(self): + return len(self.__cards) + + def draw(self): # Draw a card from the current deck + card = self.__cards.pop() + return card +ACTIONS = ["Fold", "Call", "Raise"] + class CombinedHand: - def __init__(self, hand: List [Card]=[]): - self.hand: List[Card] = hand - self.hand_strength = 0 - self.h = 0 - self.comparator = 0 - - if hand != None: - self.update_binary_representation() - - def __str__(self): - s = "" - for h in self.hand: - s += str(h) + ", " - - return s - - def as_list(self): - # Save hand as a linary of characters - s = [] - for h in self.hand: - s.append(str(h)) - - return s - - - def __len__(self): - return len(self.hand) - - def update_binary_representation(self): - self.h = 0 - for card in self.hand: # Convert cards into our binary representation - self.h += 1 << int(4 * (card.rank - 1)) << CARD_SUITS_DICT[card.suit] # TODO: I can probs optimize by storing the multiplication in another CARDS_RANK_DICT table - - if card.rank == 14: # For aces, we need to add them at the beginning as well - self.h += 1 << CARD_SUITS_DICT[card.suit] - - def add_combined_hands(self, *hands): - for hand in hands: - for card in hand.hand: - self.hand.append(card) - - self.update_binary_representation() - - def add_cards(self, *cards): - for card in cards: - self.hand.append(card) - - self.update_binary_representation() - - def get_binary_representation(self): - return bin(self.h) - - def get_hand_strength(self, verbose=False): - # In case of ties, we set self.comparator: - # 1 (Royal Flush) - Always Tie - # 2 (Straight Flush) - Set self.comparator = [lowest_straight_flush] - # 3 (Four of A kind) - Set self.comparator = [four_of_a_kind, kicker] - # 4 (Full House) - self.comparator = [three_of_a_kind, two_of_a_kind] - # 5 (Flush) - self.comparator = [flush1, flush2, flush3, flush4, flush5] - # 6 (Straight) - self.comparator = lowest_straight - # 7 (Three of a kind) - self.comparator = [three_of_a_kind, kicker1, kicker2] - # 8 (Two-Pair) - self.comparator= [Rank1, Rank2, kicker] - # 9 (Pair) - self.comparator = [pair, kicker1, kicker2, kicker3] - # 10 (High Card) - self.comparator = [kicker1, kicker2, kicker3, kicker4, kicker5] - - # 1 - Royal Flush - h = self.h - royal_flush = (h >> 36) & (h >> 40) & (h >> 44) & (h >> 48) & (h >> 52) - if royal_flush: - if verbose: - print("Royal Flush of", CARD_BIT_SUITS_DICT[royal_flush]) - self.hand_strength = 1 - return - - # 2 - Straight Flush - h = self.h - hh = (h) & (h >> 4) & (h >> 8) & (h >> 12) & (h >> 16) - - if hh: - highest_low_card = 0 - checker = hh - for i in range(1,11): - if (checker & 15): - highest_low_card = i - checker = checker >> 4 - - self.hand_strength = 2 - self.comparator = [highest_low_card] - if verbose: - print("Straight Flush starting with :", self.comparator[0]) - return - # If TIE, you can just use hh to compare - - # 3 - Four of A Kind - h = self.h >> 4 # Ignore the first 4 aces - hh = (h) & (h >> 1) & (h >> 2) & (h >> 3) & BIT_MASK_1 - if hh: - four_of_a_kind = BIT_POSITION_TABLE[hh]//4 + 2 - self.hand_strength = 3 - kicker = 0 - for card in self.hand: - if (card.rank != four_of_a_kind): - kicker = max(kicker, card.rank) - - self.comparator = [four_of_a_kind,kicker] - if verbose: - print("Four of a kind: ", self.comparator[0], "Kicker: ", self.comparator[1]) # hh is guaranteed to only have a single "1" bit - return - - - # 4 - Full House - threes, threes_hh = self.check_threes() - twos = self.check_twos(threes_hh) # Exclusive pairs, not threes, needed for full house - if (len(threes) >= 1 and len(twos) >= 1) or len(threes) > 1: - self.hand_strength = 4 - - if (len(threes) > 1): # Edge case when there are two trips - # Search for largest pair - max_three = max(threes) - if (len(twos) == 0): - max_two = 0 - else: - max_two = max(twos) - - for three in threes: - if (three != max_three): - max_two = max(max_two, three) - self.comparator = [max_three, max_two] - - else: # Regular Case - self.comparator = [max(threes), max(twos)] - - if verbose: - print("Full house with threes of: {}, pair of: {}".format(self.comparator[0], self.comparator[1])) - return - - # 5 - Flush - h = self.h >> 4 # Ignore the right most aces - for idx, MASK in enumerate(BIT_MASKS): - hh = h & MASK - if bin(hh).count("1") >= 5: - suit = CARD_SUITS[idx] - final_hand = [] - for card in self.hand: - if (card.suit == suit): - final_hand.append(card.rank) - - final_hand = sorted(final_hand, reverse=True)[:5] # Sort from best to worst - self.hand_strength = 5 - - self.comparator = final_hand - if verbose: - print("Flush with hand: ",self.comparator) - return - - # 6 - Straight - h = self.h - hh1 = h & BIT_MASK_1 - hh1 = (hh1) | (hh1 << 1) | (hh1 << 2) | (hh1 << 3) - hh2 = h & BIT_MASK_2 - hh2 = (hh2) | (hh2 >> 1) | (hh2 << 1) | (hh2 << 2) - hh4 = h & BIT_MASK_4 - hh4 = (hh4) | (hh4 << 1) | (hh4 >> 1) | (hh4 >> 2) - hh8 = h & BIT_MASK_8 - hh8 = (hh8) | (hh8 >> 1) | (hh8 >> 2) | (hh8 >> 3) - hh = hh1 | hh2 | hh4 | hh8 - hh = (hh) & (hh >> 4) & (hh >> 8) & (hh >> 12) & (hh >> 16) - - if hh: - low_card = 1 - curr = 1 - n = hh - while (curr < 15): - if (n & 1): - low_card = curr - - curr += 1 - n = n >> 4 - - self.hand_strength = 6 - self.comparator = [low_card] - if verbose: - print("Straight starting from: ", self.comparator[0]) - return low_card - - - # 7 - Three of A Kind - # threes = self.check_threes() # This is already ran in the full house - if len(threes) == 1: # If more then 1 trips, we would have covered the case in the full-house - self.hand_strength = 7 - kickers = [] - for card in self.hand: - if (card.rank != threes[0]): - kickers.append(card.rank) - kickers.sort(reverse=True) - self.comparator = [threes[0], kickers[0], kickers[1]] - if verbose: - print("Three of a kind: ", self.comparator[0], "Kickers: ", self.comparator[1:]) #TODO: Check Value - return - - # 8 - Two Pair / 9 - One Pair - # twos = self.check_threes() # This is already ran in the full house - if len(twos) >= 1: # Move this for comparison? - twos.sort(reverse=True) - if len(twos) >= 2: # Two Pair - self.hand_strength = 8 - kicker = 0 - for card in self.hand: - if (card.rank != twos[0] and card.rank != twos[1]): - kicker = max(kicker, card.rank) - self.comparator = [twos[0], twos[1], kicker] - if verbose: - print("Two Pair: ", self.comparator[0], ", ", self.comparator[1], "Kicker: ", self.comparator[2]) #TODO: Check Value - else: # One Pair - self.hand_strength = 9 - kickers = [] - for card in self.hand: - if (card.rank != twos[0]): - kickers.append(card.rank) - kickers.sort(reverse=True) - self.comparator = [twos[0], kickers[0], kickers[1], kickers[2]] - if verbose: - print("One Pair: ", self.comparator[0], "Kickers: ", self.comparator[1:]) #TODO: Check Value - - return - - - # 10 - High Card - self.hand_strength = 10 - kickers = [] - for card in self.hand: - kickers.append(card.rank) - self.comparator = sorted(kickers, reverse=True)[:5] # From best to worst ranks - if verbose: - print("High Card: ", self.comparator[-1], "Kickers: ", self.comparator[:4]) - return - - - def check_threes(self): - h = self.h >> 4 # Ignore right most aces - hh = (((h) & (h >> 1) & (h >> 2)) | ((h >> 1) & (h >> 2) & (h >> 3)) | ((h) & (h >> 1) & (h >> 3)) | ((h) & (h >> 2) & (h >> 3))) & BIT_MASK_1 - - threes = [] - if hh: - low_card = 2 - n = hh - while True: - if (n & 1): - threes.append(low_card) - - if (low_card >= 14): #Exit loop when we reached last card - break - low_card += 1 - n = n >> 4 - - # No Guarantee that hh only has 1 bit, but the bit will always be on every 4th - return threes, hh - - def check_twos(self, threes_hh): - h = self.h >> 4 # Ignore right most aces - hh = (((h) & (h >> 1)) | ((h) & (h >> 2)) | ((h) & (h >> 3)) | ((h >> 1) & (h >> 2)) | ((h >> 1) & (h >> 3)) | ((h >> 2) & (h >> 3))) & BIT_MASK_1 - hh = hh ^ threes_hh - twos = [] - if hh: - low_card = 2 - n = hh - while True: - if (n & 1): - twos.append(low_card) - - if (low_card >= 14): #Exit loop when we reached last card - break - low_card += 1 - n = n >> 4 - - return twos - + def __init__(self, hand: List[Card] = []): + self.hand: List[Card] = hand + self.hand_strength = 0 + self.h = 0 + self.comparator = 0 + + if hand != None: + self.update_binary_representation() + + def __str__(self): + s = "" + for h in self.hand: + s += str(h) + ", " + + return s + + def as_list(self): + # Save hand as a linary of characters + s = [] + for h in self.hand: + s.append(str(h)) + + return s + + def __len__(self): + return len(self.hand) + + def update_binary_representation(self): + self.h = 0 + for card in self.hand: # Convert cards into our binary representation + self.h += ( + 1 << int(4 * (card.rank - 1)) << CARD_SUITS_DICT[card.suit] + ) # TODO: I can probs optimize by storing the multiplication in another CARDS_RANK_DICT table + + if card.rank == 14: # For aces, we need to add them at the beginning as well + self.h += 1 << CARD_SUITS_DICT[card.suit] + + def add_combined_hands(self, *hands): + for hand in hands: + for card in hand.hand: + self.hand.append(card) + + self.update_binary_representation() + + def add_cards(self, *cards): + for card in cards: + self.hand.append(card) + + self.update_binary_representation() + + def get_binary_representation(self): + return bin(self.h) + + def get_hand_strength(self, verbose=False): + # In case of ties, we set self.comparator: + # 1 (Royal Flush) - Always Tie + # 2 (Straight Flush) - Set self.comparator = [lowest_straight_flush] + # 3 (Four of A kind) - Set self.comparator = [four_of_a_kind, kicker] + # 4 (Full House) - self.comparator = [three_of_a_kind, two_of_a_kind] + # 5 (Flush) - self.comparator = [flush1, flush2, flush3, flush4, flush5] + # 6 (Straight) - self.comparator = lowest_straight + # 7 (Three of a kind) - self.comparator = [three_of_a_kind, kicker1, kicker2] + # 8 (Two-Pair) - self.comparator= [Rank1, Rank2, kicker] + # 9 (Pair) - self.comparator = [pair, kicker1, kicker2, kicker3] + # 10 (High Card) - self.comparator = [kicker1, kicker2, kicker3, kicker4, kicker5] + + # 1 - Royal Flush + h = self.h + royal_flush = (h >> 36) & (h >> 40) & (h >> 44) & (h >> 48) & (h >> 52) + if royal_flush: + if verbose: + print("Royal Flush of", CARD_BIT_SUITS_DICT[royal_flush]) + self.hand_strength = 1 + return + + # 2 - Straight Flush + h = self.h + hh = (h) & (h >> 4) & (h >> 8) & (h >> 12) & (h >> 16) + + if hh: + highest_low_card = 0 + checker = hh + for i in range(1, 11): + if checker & 15: + highest_low_card = i + checker = checker >> 4 + + self.hand_strength = 2 + self.comparator = [highest_low_card] + if verbose: + print("Straight Flush starting with :", self.comparator[0]) + return + # If TIE, you can just use hh to compare + + # 3 - Four of A Kind + h = self.h >> 4 # Ignore the first 4 aces + hh = (h) & (h >> 1) & (h >> 2) & (h >> 3) & BIT_MASK_1 + if hh: + four_of_a_kind = BIT_POSITION_TABLE[hh] // 4 + 2 + self.hand_strength = 3 + kicker = 0 + for card in self.hand: + if card.rank != four_of_a_kind: + kicker = max(kicker, card.rank) + + self.comparator = [four_of_a_kind, kicker] + if verbose: + print( + "Four of a kind: ", self.comparator[0], "Kicker: ", self.comparator[1] + ) # hh is guaranteed to only have a single "1" bit + return + + # 4 - Full House + threes, threes_hh = self.check_threes() + twos = self.check_twos(threes_hh) # Exclusive pairs, not threes, needed for full house + if (len(threes) >= 1 and len(twos) >= 1) or len(threes) > 1: + self.hand_strength = 4 + + if len(threes) > 1: # Edge case when there are two trips + # Search for largest pair + max_three = max(threes) + if len(twos) == 0: + max_two = 0 + else: + max_two = max(twos) + + for three in threes: + if three != max_three: + max_two = max(max_two, three) + self.comparator = [max_three, max_two] + + else: # Regular Case + self.comparator = [max(threes), max(twos)] + + if verbose: + print( + "Full house with threes of: {}, pair of: {}".format( + self.comparator[0], self.comparator[1] + ) + ) + return + + # 5 - Flush + h = self.h >> 4 # Ignore the right most aces + for idx, MASK in enumerate(BIT_MASKS): + hh = h & MASK + if bin(hh).count("1") >= 5: + suit = CARD_SUITS[idx] + final_hand = [] + for card in self.hand: + if card.suit == suit: + final_hand.append(card.rank) + + final_hand = sorted(final_hand, reverse=True)[:5] # Sort from best to worst + self.hand_strength = 5 + + self.comparator = final_hand + if verbose: + print("Flush with hand: ", self.comparator) + return + + # 6 - Straight + h = self.h + hh1 = h & BIT_MASK_1 + hh1 = (hh1) | (hh1 << 1) | (hh1 << 2) | (hh1 << 3) + hh2 = h & BIT_MASK_2 + hh2 = (hh2) | (hh2 >> 1) | (hh2 << 1) | (hh2 << 2) + hh4 = h & BIT_MASK_4 + hh4 = (hh4) | (hh4 << 1) | (hh4 >> 1) | (hh4 >> 2) + hh8 = h & BIT_MASK_8 + hh8 = (hh8) | (hh8 >> 1) | (hh8 >> 2) | (hh8 >> 3) + hh = hh1 | hh2 | hh4 | hh8 + hh = (hh) & (hh >> 4) & (hh >> 8) & (hh >> 12) & (hh >> 16) + + if hh: + low_card = 1 + curr = 1 + n = hh + while curr < 15: + if n & 1: + low_card = curr + + curr += 1 + n = n >> 4 + + self.hand_strength = 6 + self.comparator = [low_card] + if verbose: + print("Straight starting from: ", self.comparator[0]) + return low_card + + # 7 - Three of A Kind + # threes = self.check_threes() # This is already ran in the full house + if ( + len(threes) == 1 + ): # If more then 1 trips, we would have covered the case in the full-house + self.hand_strength = 7 + kickers = [] + for card in self.hand: + if card.rank != threes[0]: + kickers.append(card.rank) + kickers.sort(reverse=True) + self.comparator = [threes[0], kickers[0], kickers[1]] + if verbose: + print( + "Three of a kind: ", self.comparator[0], "Kickers: ", self.comparator[1:] + ) # TODO: Check Value + return + + # 8 - Two Pair / 9 - One Pair + # twos = self.check_threes() # This is already ran in the full house + if len(twos) >= 1: # Move this for comparison? + twos.sort(reverse=True) + if len(twos) >= 2: # Two Pair + self.hand_strength = 8 + kicker = 0 + for card in self.hand: + if card.rank != twos[0] and card.rank != twos[1]: + kicker = max(kicker, card.rank) + self.comparator = [twos[0], twos[1], kicker] + if verbose: + print( + "Two Pair: ", + self.comparator[0], + ", ", + self.comparator[1], + "Kicker: ", + self.comparator[2], + ) # TODO: Check Value + else: # One Pair + self.hand_strength = 9 + kickers = [] + for card in self.hand: + if card.rank != twos[0]: + kickers.append(card.rank) + kickers.sort(reverse=True) + self.comparator = [twos[0], kickers[0], kickers[1], kickers[2]] + if verbose: + print( + "One Pair: ", self.comparator[0], "Kickers: ", self.comparator[1:] + ) # TODO: Check Value + + return + + # 10 - High Card + self.hand_strength = 10 + kickers = [] + for card in self.hand: + kickers.append(card.rank) + self.comparator = sorted(kickers, reverse=True)[:5] # From best to worst ranks + if verbose: + print("High Card: ", self.comparator[-1], "Kickers: ", self.comparator[:4]) + return + + def check_threes(self): + h = self.h >> 4 # Ignore right most aces + hh = ( + ((h) & (h >> 1) & (h >> 2)) + | ((h >> 1) & (h >> 2) & (h >> 3)) + | ((h) & (h >> 1) & (h >> 3)) + | ((h) & (h >> 2) & (h >> 3)) + ) & BIT_MASK_1 + + threes = [] + if hh: + low_card = 2 + n = hh + while True: + if n & 1: + threes.append(low_card) + + if low_card >= 14: # Exit loop when we reached last card + break + low_card += 1 + n = n >> 4 + + # No Guarantee that hh only has 1 bit, but the bit will always be on every 4th + return threes, hh + + def check_twos(self, threes_hh): + h = self.h >> 4 # Ignore right most aces + hh = ( + ((h) & (h >> 1)) + | ((h) & (h >> 2)) + | ((h) & (h >> 3)) + | ((h >> 1) & (h >> 2)) + | ((h >> 1) & (h >> 3)) + | ((h >> 2) & (h >> 3)) + ) & BIT_MASK_1 + hh = hh ^ threes_hh + twos = [] + if hh: + low_card = 2 + n = hh + while True: + if n & 1: + twos.append(low_card) + + if low_card >= 14: # Exit loop when we reached last card + break + low_card += 1 + n = n >> 4 + + return twos + class Evaluator: - def __init__(self): - self.hands: List[CombinedHand] = [] - - def add_hands(self, *combined_hands: CombinedHand): - for combined_hand in combined_hands: - self.hands.append(combined_hand) - - def clear_hands(self): - self.hands = [] - - def __str__(self): - ans = "" - for hand in self.hands: - ans += str(hand) + " " - ans += '\n' - return ans - - def get_winner(self) -> List[int]: # Return a list of 0-indexed of players who won the pot. If multiple, then split - for hand in self.hands: - hand.get_hand_strength() - hand_strengths = [hand.hand_strength for hand in self.hands] - best_hand_val = min(hand_strengths) - potential_winners = [i for i, x in enumerate(hand_strengths) if x == best_hand_val] - - # TODO: Idea to optimize in the future, just make the best hand as a list, and then compare if necessary. - - if len(potential_winners) > 1: # Potential ties - if best_hand_val == 1: # Royal Flush, Automatic Tie - return potential_winners - - elif best_hand_val == 2: # Straight Flush, check low card - highest_low_card = 0 - for winner in potential_winners: - highest_low_card = max(highest_low_card, self.hands[winner].comparator[0]) - winners = [] - for winner in potential_winners: - if (self.hands[winner].comparator[0] == highest_low_card): - winners.append(winner) - return winners - - elif best_hand_val == 3: # Four of a kind - highest_four = 0 - highest_kicker = 0 - for winner in potential_winners: - highest_four = max(highest_four, self.hands[winner].comparator[0]) - highest_kicker = max(highest_kicker, self.hands[winner].comparator[1]) - - winners = [] - for winner in potential_winners: - if (self.hands[winner].comparator[0] == highest_four and self.hands[winner].comparator[1] == highest_kicker): - winners.append(winner) - - return winners - - elif best_hand_val == 4: # Full House - highest_threes = 0 - highest_twos = 0 - for winner in potential_winners: - highest_threes = max(highest_threes, self.hands[winner].comparator[0]) - highest_twos = max(highest_twos, self.hands[winner].comparator[1]) - - winners = [] - for winner in potential_winners: - if (self.hands[winner].comparator[0] == highest_threes and self.hands[winner].comparator[1] == highest_twos): # Pick player with best full house - winners.append(winner) - - if (len(winners) ==0): # Edge case when we have full house over full house - for winner in potential_winners: - if (self.hands[winner].comparator[0] == highest_threes): - winners.append(winner) - return winners - - elif best_hand_val == 5: # Flush - best_flush = [0,0,0,0,0] - - # Check from best card to worst card. - for i in range(5): - for winner in potential_winners: - best_flush[i] = max(best_flush[i], self.hands[winner].comparator[i]) - - winners = [] - for winner in potential_winners: - if (self.hands[winner].comparator[i] == best_flush[i]): - winners.append(winner) - - if (len(winners) == 1): # Whenever there is only 1 winner, just return - return winners - - return winners - - elif best_hand_val == 6: # Straight - highest_low_card = 0 - for winner in potential_winners: - highest_low_card = max(highest_low_card, self.hands[winner].comparator[0]) - - winners = [] - for winner in potential_winners: - if (highest_low_card == self.hands[winner].comparator[0]): - winners.append(winner) - - return winners - - elif best_hand_val == 7: # Three of a kind - best_hand = [0,0,0] # [three_of_a_kind, kicker1, kicker2] - for i in range(3): - for winner in potential_winners: - best_hand[i] = max(best_hand[i], self.hands[winner].comparator[i]) - - winners = [] - for winner in potential_winners: - if (self.hands[winner].comparator[i] == best_hand[i]): winners.append(winner) - if len(winners) == 1: - return winners - - return winners # In case of tie, this will be run - - elif best_hand_val == 8: # Two Pair - best_hand = [0,0,0] # [best_pair1, best_pair2, kicker] - for i in range(3): - for winner in potential_winners: - best_hand[i] = max(best_hand[i], self.hands[winner].comparator[i]) - - winners = [] - for winner in potential_winners: - if (self.hands[winner].comparator[i] == best_hand[i]): winners.append(winner) - if len(winners) == 1: - return winners - - return winners # In case of tie, this will be run - - elif best_hand_val == 9: # One Pair - best_hand = [0,0,0,0] # [pair, kicker1, kicker2, kicker3] - for i in range(4): - for winner in potential_winners: - best_hand[i] = max(best_hand[i], self.hands[winner].comparator[i]) - - winners = [] - for winner in potential_winners: - if (self.hands[winner].comparator[i] == best_hand[i]): winners.append(winner) - if len(winners) == 1: - return winners - - return winners # In case of time, this will be run - elif best_hand_val == 10: # High Card - best_hand = [0,0,0,0,0] # [kicker1, kicker2, kicker3, kicker4, kicker5] - for i in range(5): - for winner in potential_winners: - best_hand[i] = max(best_hand[i], self.hands[winner].comparator[i]) - - winners = [] - for winner in potential_winners: - if (self.hands[winner].comparator[i] == best_hand[i]): winners.append(winner) - if len(winners) == 1: - return winners - - return winners - - - else: # Single person has the best hand - return potential_winners - \ No newline at end of file + def __init__(self): + self.hands: List[CombinedHand] = [] + + def add_hands(self, *combined_hands: CombinedHand): + for combined_hand in combined_hands: + self.hands.append(combined_hand) + + def clear_hands(self): + self.hands = [] + + def __str__(self): + ans = "" + for hand in self.hands: + ans += str(hand) + " " + ans += "\n" + return ans + + def get_winner( + self, + ) -> List[ + int + ]: # Return a list of 0-indexed of players who won the pot. If multiple, then split + for hand in self.hands: + hand.get_hand_strength() + hand_strengths = [hand.hand_strength for hand in self.hands] + best_hand_val = min(hand_strengths) + potential_winners = [i for i, x in enumerate(hand_strengths) if x == best_hand_val] + + # TODO: Idea to optimize in the future, just make the best hand as a list, and then compare if necessary. + + if len(potential_winners) > 1: # Potential ties + if best_hand_val == 1: # Royal Flush, Automatic Tie + return potential_winners + + elif best_hand_val == 2: # Straight Flush, check low card + highest_low_card = 0 + for winner in potential_winners: + highest_low_card = max(highest_low_card, self.hands[winner].comparator[0]) + winners = [] + for winner in potential_winners: + if self.hands[winner].comparator[0] == highest_low_card: + winners.append(winner) + return winners + + elif best_hand_val == 3: # Four of a kind + highest_four = 0 + highest_kicker = 0 + for winner in potential_winners: + highest_four = max(highest_four, self.hands[winner].comparator[0]) + highest_kicker = max(highest_kicker, self.hands[winner].comparator[1]) + + winners = [] + for winner in potential_winners: + if ( + self.hands[winner].comparator[0] == highest_four + and self.hands[winner].comparator[1] == highest_kicker + ): + winners.append(winner) + + return winners + + elif best_hand_val == 4: # Full House + highest_threes = 0 + highest_twos = 0 + for winner in potential_winners: + highest_threes = max(highest_threes, self.hands[winner].comparator[0]) + highest_twos = max(highest_twos, self.hands[winner].comparator[1]) + + winners = [] + for winner in potential_winners: + if ( + self.hands[winner].comparator[0] == highest_threes + and self.hands[winner].comparator[1] == highest_twos + ): # Pick player with best full house + winners.append(winner) + + if len(winners) == 0: # Edge case when we have full house over full house + for winner in potential_winners: + if self.hands[winner].comparator[0] == highest_threes: + winners.append(winner) + return winners + + elif best_hand_val == 5: # Flush + best_flush = [0, 0, 0, 0, 0] + + # Check from best card to worst card. + for i in range(5): + for winner in potential_winners: + best_flush[i] = max(best_flush[i], self.hands[winner].comparator[i]) + + winners = [] + for winner in potential_winners: + if self.hands[winner].comparator[i] == best_flush[i]: + winners.append(winner) + + if len(winners) == 1: # Whenever there is only 1 winner, just return + return winners + + return winners + + elif best_hand_val == 6: # Straight + highest_low_card = 0 + for winner in potential_winners: + highest_low_card = max(highest_low_card, self.hands[winner].comparator[0]) + + winners = [] + for winner in potential_winners: + if highest_low_card == self.hands[winner].comparator[0]: + winners.append(winner) + + return winners + + elif best_hand_val == 7: # Three of a kind + best_hand = [0, 0, 0] # [three_of_a_kind, kicker1, kicker2] + for i in range(3): + for winner in potential_winners: + best_hand[i] = max(best_hand[i], self.hands[winner].comparator[i]) + + winners = [] + for winner in potential_winners: + if self.hands[winner].comparator[i] == best_hand[i]: + winners.append(winner) + if len(winners) == 1: + return winners + + return winners # In case of tie, this will be run + + elif best_hand_val == 8: # Two Pair + best_hand = [0, 0, 0] # [best_pair1, best_pair2, kicker] + for i in range(3): + for winner in potential_winners: + best_hand[i] = max(best_hand[i], self.hands[winner].comparator[i]) + + winners = [] + for winner in potential_winners: + if self.hands[winner].comparator[i] == best_hand[i]: + winners.append(winner) + if len(winners) == 1: + return winners + + return winners # In case of tie, this will be run + + elif best_hand_val == 9: # One Pair + best_hand = [0, 0, 0, 0] # [pair, kicker1, kicker2, kicker3] + for i in range(4): + for winner in potential_winners: + best_hand[i] = max(best_hand[i], self.hands[winner].comparator[i]) + + winners = [] + for winner in potential_winners: + if self.hands[winner].comparator[i] == best_hand[i]: + winners.append(winner) + if len(winners) == 1: + return winners + + return winners # In case of time, this will be run + elif best_hand_val == 10: # High Card + best_hand = [0, 0, 0, 0, 0] # [kicker1, kicker2, kicker3, kicker4, kicker5] + for i in range(5): + for winner in potential_winners: + best_hand[i] = max(best_hand[i], self.hands[winner].comparator[i]) + + winners = [] + for winner in potential_winners: + if self.hands[winner].comparator[i] == best_hand[i]: + winners.append(winner) + if len(winners) == 1: + return winners + + return winners + + else: # Single person has the best hand + return potential_winners diff --git a/src/fast_evaluator.py b/src/fast_evaluator.py index 34bd96c..0386940 100644 --- a/src/fast_evaluator.py +++ b/src/fast_evaluator.py @@ -2,25 +2,26 @@ This is a fast evaluator used for training. It works with string representation of cards. However, it cannot tell you if you won with a pair, three of a kind, etc. """ -import random + +import random from phevaluator import evaluate_cards + def Deck(excluded_cards=[]): - # Returns a shuffled deck - deck = [] - for rank in ["A", "2", "3", "4", "5", "6", "7", "8", "9", "T", "J", "Q", "K"]: - for suit in ["h", "d", "s", "c"]: - if (rank+suit not in excluded_cards): - deck.append(rank + suit) - - random.shuffle(deck) - return deck + # Returns a shuffled deck + deck = [] + for rank in ["A", "2", "3", "4", "5", "6", "7", "8", "9", "T", "J", "Q", "K"]: + for suit in ["h", "d", "s", "c"]: + if rank + suit not in excluded_cards: + deck.append(rank + suit) + + random.shuffle(deck) + return deck + def get_player_score(player_cards, board=[]): - """Wrapper for the evaluate_cards function by phevaluator. - - """ - assert(len(player_cards) == 2) - assert(len(board) <= 5) - # Returns a score using the phevaluator library - return evaluate_cards(*(player_cards + board)) \ No newline at end of file + """Wrapper for the evaluate_cards function by phevaluator.""" + assert len(player_cards) == 2 + assert len(board) <= 5 + # Returns a score using the phevaluator library + return evaluate_cards(*(player_cards + board)) diff --git a/src/holdem.py b/src/holdem.py index 552d19f..713c282 100644 --- a/src/holdem.py +++ b/src/holdem.py @@ -1,4 +1,3 @@ - import base from base import Player, Action import random @@ -9,425 +8,449 @@ class HoldEmHistory(base.History): - """ - Example of history: - First two actions are the cards dealt to the players. The rest of the actions are the actions taken by the players. - 1. ['AkTh', 'QdKd', 'b2', 'c', '/', 'QhJdKs', 'b2', 'c', '/', 'k', 'k'] - - ---- ACTIONS ---- - - k = check - - bX = bet X amount (this includes raising) - - c = call - - f = fold (you cannot fold if the other player just checked) - - Every round starts the same way: - Small blind = 1 chip - Big blind = 2 chips - - Total chips = 100BB per player. - Minimum raise = X to match bet, and Y is the raise amount - If no raise before, then the minimum raise amount is 2x the bet amount (preflop would be 2x big blind). - Else it is whatever was previously raised. This is not the same as 2x the previous bet amount. Just the Y raise amount. - - Ex: The bet is 10$. I raise to 50$, so I raised by 40$ (Y = 40). The next player's minimum raise is not 100$, but rather to 90$, since (it's 50$ to match the bet, and 40$ to match the raise). - - Minimum bet = 1 chip (0.5BB) - - The API for the history is inspired from the Slumbot API. - - I want to avoid all the extra overhead, so taking inspiration from `environment.py` with the `PokerEnvironment` - - - """ - def __init__(self, history: List[Action] = []): - super().__init__(history) - - - def is_terminal(self): - if len(self.history) == 0: return False - folded = self.history[-1] == 'f' - is_showdown = self.history.count('/') == 3 and self.history[-1] == 'c' # Showdown, since one of the players is calling - if folded or is_showdown: - return True - else: - return False - - def get_current_cards(self): - current_cards = [] - new_stage = False - stage_i = 0 - for i, action in enumerate(self.history): - if new_stage: - new_stage = False - if stage_i == 1: # Flop, so there are 3 community cards - assert(len(action) == 6) - current_cards.append(action[:2]) # Community card 1 - current_cards.append(action[2:4]) # Community card 2 - current_cards.append(action[4:6]) # Community card 3 - - else: # Turn or river - current_cards.append(action) # Community card - elif action == '/': - new_stage = True - stage_i += 1 - - elif i == 0 or i == 1: - assert(len(action) == 4) - current_cards.append(action[:2]) # Private card 1 - current_cards.append(action[2:4]) # Private card 2 - - return current_cards - - def get_current_game_stage_history(self): - """ - return current_game_stage_history, stages[stage_i] excluding the community cards drawn. We only care about the actions - of the players. - """ - game_stage_start = 2 # Because we are skipping the pairs of private cards drawn at the beginning of the round - stage_i = 0 - stages = ['preflop', 'flop', 'turn', 'river'] - for i, action in enumerate(self.history): - if action == '/': - game_stage_start = i + 2 # Skip the community card - stage_i += 1 - - if game_stage_start >= len(self.history): - return [], stages[stage_i] - else: - current_game_stage_history = self.history[game_stage_start:] - return current_game_stage_history, stages[stage_i] - - def actions(self): - if self.is_chance(): - if len(self.history) > 2 and self.history[-1] != '/': return ['/'] - else: - cards_to_exclude = self.get_current_cards() - cards = Deck(cards_to_exclude) - return cards - - elif not self.is_terminal(): - assert(not self.game_stage_ended()) # game_stage_ended would mean that it is a chance node - """ + """ + Example of history: + First two actions are the cards dealt to the players. The rest of the actions are the actions taken by the players. + 1. ['AkTh', 'QdKd', 'b2', 'c', '/', 'QhJdKs', 'b2', 'c', '/', 'k', 'k'] + + ---- ACTIONS ---- + - k = check + - bX = bet X amount (this includes raising) + - c = call + - f = fold (you cannot fold if the other player just checked) + + Every round starts the same way: + Small blind = 1 chip + Big blind = 2 chips + + Total chips = 100BB per player. + Minimum raise = X to match bet, and Y is the raise amount + If no raise before, then the minimum raise amount is 2x the bet amount (preflop would be 2x big blind). + Else it is whatever was previously raised. This is not the same as 2x the previous bet amount. Just the Y raise amount. + + Ex: The bet is 10$. I raise to 50$, so I raised by 40$ (Y = 40). The next player's minimum raise is not 100$, but rather to 90$, since (it's 50$ to match the bet, and 40$ to match the raise). + + Minimum bet = 1 chip (0.5BB) + + The API for the history is inspired from the Slumbot API. + + I want to avoid all the extra overhead, so taking inspiration from `environment.py` with the `PokerEnvironment` + + + """ + + def __init__(self, history: List[Action] = []): + super().__init__(history) + + def is_terminal(self): + if len(self.history) == 0: + return False + folded = self.history[-1] == "f" + is_showdown = ( + self.history.count("/") == 3 and self.history[-1] == "c" + ) # Showdown, since one of the players is calling + if folded or is_showdown: + return True + else: + return False + + def get_current_cards(self): + current_cards = [] + new_stage = False + stage_i = 0 + for i, action in enumerate(self.history): + if new_stage: + new_stage = False + if stage_i == 1: # Flop, so there are 3 community cards + assert len(action) == 6 + current_cards.append(action[:2]) # Community card 1 + current_cards.append(action[2:4]) # Community card 2 + current_cards.append(action[4:6]) # Community card 3 + + else: # Turn or river + current_cards.append(action) # Community card + elif action == "/": + new_stage = True + stage_i += 1 + + elif i == 0 or i == 1: + assert len(action) == 4 + current_cards.append(action[:2]) # Private card 1 + current_cards.append(action[2:4]) # Private card 2 + + return current_cards + + def get_current_game_stage_history(self): + """ + return current_game_stage_history, stages[stage_i] excluding the community cards drawn. We only care about the actions + of the players. + """ + game_stage_start = 2 # Because we are skipping the pairs of private cards drawn at the beginning of the round + stage_i = 0 + stages = ["preflop", "flop", "turn", "river"] + for i, action in enumerate(self.history): + if action == "/": + game_stage_start = i + 2 # Skip the community card + stage_i += 1 + + if game_stage_start >= len(self.history): + return [], stages[stage_i] + else: + current_game_stage_history = self.history[game_stage_start:] + return current_game_stage_history, stages[stage_i] + + def actions(self): + if self.is_chance(): + if len(self.history) > 2 and self.history[-1] != "/": + return ["/"] + else: + cards_to_exclude = self.get_current_cards() + cards = Deck(cards_to_exclude) + return cards + + elif not self.is_terminal(): + assert ( + not self.game_stage_ended() + ) # game_stage_ended would mean that it is a chance node + """ To limit this game going to infinity, I only allow for 3 betting rounds. I.e. if I bet, you raise, I raise, you raise, then I must either call, fold, or all-in. Else the branching factor is going to be insane. """ - actions = ['k', 'c', 'f'] - player = self.player() - remaining_amount = self.get_remaining_balance(player) - min_bet = self.get_min_bet() - - for bet_size in range(min_bet, remaining_amount + 1): # These define the legal actions of the game - actions.append('b' + str(bet_size)) - - current_game_stage_history, stage = self.get_current_game_stage_history() - # Pre-flop - if stage == 'preflop': - # Small blind to act - if len(current_game_stage_history) == 0: # Action on SB (Dealer), who can either call, bet, or fold - actions.remove('k') # You cannot check - return actions - - # big blind to act - elif len(current_game_stage_history) == 1: # 2-bet - if (current_game_stage_history[0] == 'c'): # Small blind called, you don't need to fold - actions.remove('f') - return actions - else: # Other player has bet, so you cannot check - actions.remove('k') - return actions - else: - actions.remove('k') - - # elif len(current_game_stage_history) == 2: # 3-bet - # # You cannot check at this point - # actions = ['b1', 'all-in', 'c', 'f'] - - # elif len(current_game_stage_history) == 3: # 4-bet - # actions = ['all-in', 'c', 'f'] - - else: # flop, turn, river - if len(current_game_stage_history) == 0: - actions.remove('f') # You cannot fold - elif len(current_game_stage_history) == 1: - if current_game_stage_history[0] == 'k': - actions.remove('f') - else: # Opponent has bet, so you cannot check - actions.remove('k') - else: - actions.remove('k') - - return actions - else: - raise Exception("Cannot call actions on a terminal history") - - def get_min_bet(self): - # TODO: Test this function - curr_bet = 0 - prev_bet = 0 - for i in range(len(self.history)-1, 0, -1): - if self.history[i][0] == 'b': # Bet, might be a raise - if curr_bet == 0: - curr_bet = int(self.history[i][1:]) - elif prev_bet == 0: - prev_bet = int(self.history[i][1:]) - elif self.history[i] == '/': - break - - # Handle case when game stage is preflop, in which case a bet is already placed for you - game_stage_history, game_stage = self.get_current_game_stage_history() - if game_stage == 'preflop' and curr_bet == 0: - curr_bet = 2 # big blind - elif curr_bet == 0: # No bets has been placed - assert(prev_bet == 0) - curr_bet = 1 - - return int(curr_bet + (curr_bet - prev_bet)) # This is the minimum raise - - - def calculate_player_total_up_to_game_stage(self, player: Player): - stage_i = 0 - player_total = 0 # Total across all game stages (preflop, flop, turn, river) - player_game_stage_total = 0 # Total for a given game stage - i = 0 - for hist_idx, hist in enumerate(self.history): - i = (i + 1) % 2 - if i == player: - if hist[0] == 'b': - player_game_stage_total = int(hist[1:]) - elif hist == 'k': - if stage_i == 0: # preflop, checking means 2 - player_game_stage_total = 2 - else: - player_game_stage_total = 0 - elif hist == 'c': # Call the previous bet - # Exception for when you can call the big blind on the preflop, without the big blind having bet previously - if hist_idx == 2: - player_game_stage_total = 2 - else: - player_game_stage_total = int(self.history[hist_idx - 1][1:]) - - if hist == '/': - stage_i += 1 - player_total += player_game_stage_total - player_game_stage_total = 0 - if stage_i == 1: - i = (i + 1) % 2 # We need to flip the order post-flop, as the BB is the one who acts first now - - return player_total - - - def get_remaining_balance(self, player: Player): - # Each player starts with a balance of 100 at the beginning of each hand - return 100 - self.calculate_player_total_up_to_game_stage(player) - - def game_stage_ended(self): - # TODO: Make sure this logic is good - current_game_stage_history, stage = self.get_current_game_stage_history() - if len(current_game_stage_history) == 0: - return False - elif current_game_stage_history[-1] == 'f': - return True - elif current_game_stage_history[-1] == 'c' and len(self.history) > 3: # On pre-flop, when the small blind calls, the opponent can still bet - return True - elif len(current_game_stage_history) >= 2 and current_game_stage_history[-2:] == ['k', 'k']: - return True - else: - return False - - def player(self): - """ - This part is confusing for heads-up no limit poker, because the player that acts first changes: - The Small Blind (SB) acts first pre-flop, but the Big Blind (BB) acts first post-flop. - 1. ['AkTh', 'QdKd', 'b2', 'c', '/', 'Qh', 'b2', 'c', '/', '2d', b2', 'f'] - SB BB BB SB BB SB - """ - if len(self.history) <= 1: return -1 - elif self.game_stage_ended(): return -1 - elif self.history[-1] == '/': return -1 - else: - if '/' in self.history: - return (len(self.history) + 1) % 2 # Order is flipped post-flop - else: - return len(self.history) % 2 - - def is_chance(self): - return super().is_chance() - - def sample_chance_outcome(self): - assert(self.is_chance()) - - cards = self.actions() # Will be either or cards not seen in the deck or ['/'] - - if len(self.history) <= 1: # We need to deal two cards to each player - cards = random.sample(cards, 2) - return ''.join(cards) - else: - return random.choice(cards) # Sample one of the community cards with equal probability - - - def terminal_utility(self, i: Player) -> int: - assert(self.is_terminal()) # We can only call the utility for a terminal history - assert(i in [0, 1]) # Only works for 2 player games for now - - actions = ['k', 'b1', 'b2', 'b4', 'b8', 'all-in', 'c', 'f'] - pot_size = 0 - # These represent the bets in the current game stage, i.e. pre-flop, flop, turn, river - prev_bet = 1 # small blind starting value - curr_bet = 2 # big blind starting value - for i, action in enumerate(self.history): - if action == '/': # Move on to next stage - assert(curr_bet == prev_bet and curr_bet == 0) - pot_size += curr_bet - prev_bet = 0 - - if action not in actions: - continue - - if action == 'k': - assert(curr_bet == prev_bet and curr_bet == 0) - - elif action == 'b1': - assert(curr_bet == 0) - curr_bet = 1 - - elif action == 'b2': - if curr_bet == 0: - assert(prev_bet == 0) - curr_bet = 2 - else: - prev_bet = curr_bet - curr_bet *= 2 - elif action == 'b4': - if curr_bet == 0: - assert(prev_bet == 0) - curr_bet = 4 - else: - prev_bet = curr_bet - curr_bet *= 4 - elif action == 'b8': - if curr_bet == 0: - assert(prev_bet == 0) - curr_bet = 8 - else: - prev_bet == curr_bet - curr_bet *= 8 - - elif action == 'all-in': - curr_bet = 100 - pot_size - curr_bet # Maximum, since each player has 100 chips - - elif action == 'c': - assert(curr_bet != 0) - pot_size += 2 * curr_bet - curr_bet = 0 - prev_bet = 0 - - elif action == 'f': - assert(i == len(self.history) - 1) # Folding should be the last action - - pot_size += prev_bet - pot_size += curr_bet - - else: - raise Exception("Action not recognized") - - # Now that we know how much we won from the pot, we also we to calculate how much we made ourselves - - - def __add__(self, action: Action): - new_history = HoldEmHistory(self.history + [action]) - return new_history - - def get_infoSet_key(self, kmeans_flop, kmeans_turn, kmeans_river) -> List[Action]: - assert(not self.is_chance()) - assert(not self.is_terminal()) - - player = self.player() - history = copy.deepcopy(self.history) - print(history) - - # ----- Assign cluster ID for PREFLOP ----- - player_cards = [] - if player == 0: - player_cards = history[0] - history[0] = get_preflop_cluster_id(history[0]) - history[1] = '?' - else: - player_cards = history[1] - history[0] = '?' - history[1] = get_preflop_cluster_id(history[1]) - - # ----- Assign cluster ID for FLOP/TURN/RIVER ----- - community_cards = '' - new_stage = False - stage_i = 0 - for i, action in enumerate(history): - if new_stage: - new_stage = False - if stage_i == 1: - assert(len(action) == 6) - community_cards += action - history[i] = get_flop_cluster_id(kmeans_flop, player_cards+community_cards) - - elif stage_i == 2: - assert(len(action) == 2) - community_cards += action - history[i] = get_turn_cluster_id(kmeans_turn, player_cards+community_cards) - elif stage_i == 3: - assert(len(action) == 2) - community_cards += action - history[i] = get_river_cluster_id(kmeans_river, player_cards+community_cards) - elif action == '/': - new_stage = True - stage_i += 1 - - - return history + actions = ["k", "c", "f"] + player = self.player() + remaining_amount = self.get_remaining_balance(player) + min_bet = self.get_min_bet() + + for bet_size in range( + min_bet, remaining_amount + 1 + ): # These define the legal actions of the game + actions.append("b" + str(bet_size)) + + current_game_stage_history, stage = self.get_current_game_stage_history() + # Pre-flop + if stage == "preflop": + # Small blind to act + if ( + len(current_game_stage_history) == 0 + ): # Action on SB (Dealer), who can either call, bet, or fold + actions.remove("k") # You cannot check + return actions + + # big blind to act + elif len(current_game_stage_history) == 1: # 2-bet + if ( + current_game_stage_history[0] == "c" + ): # Small blind called, you don't need to fold + actions.remove("f") + return actions + else: # Other player has bet, so you cannot check + actions.remove("k") + return actions + else: + actions.remove("k") + + # elif len(current_game_stage_history) == 2: # 3-bet + # # You cannot check at this point + # actions = ['b1', 'all-in', 'c', 'f'] + + # elif len(current_game_stage_history) == 3: # 4-bet + # actions = ['all-in', 'c', 'f'] + + else: # flop, turn, river + if len(current_game_stage_history) == 0: + actions.remove("f") # You cannot fold + elif len(current_game_stage_history) == 1: + if current_game_stage_history[0] == "k": + actions.remove("f") + else: # Opponent has bet, so you cannot check + actions.remove("k") + else: + actions.remove("k") + + return actions + else: + raise Exception("Cannot call actions on a terminal history") + + def get_min_bet(self): + # TODO: Test this function + curr_bet = 0 + prev_bet = 0 + for i in range(len(self.history) - 1, 0, -1): + if self.history[i][0] == "b": # Bet, might be a raise + if curr_bet == 0: + curr_bet = int(self.history[i][1:]) + elif prev_bet == 0: + prev_bet = int(self.history[i][1:]) + elif self.history[i] == "/": + break + + # Handle case when game stage is preflop, in which case a bet is already placed for you + game_stage_history, game_stage = self.get_current_game_stage_history() + if game_stage == "preflop" and curr_bet == 0: + curr_bet = 2 # big blind + elif curr_bet == 0: # No bets has been placed + assert prev_bet == 0 + curr_bet = 1 + + return int(curr_bet + (curr_bet - prev_bet)) # This is the minimum raise + + def calculate_player_total_up_to_game_stage(self, player: Player): + stage_i = 0 + player_total = 0 # Total across all game stages (preflop, flop, turn, river) + player_game_stage_total = 0 # Total for a given game stage + i = 0 + for hist_idx, hist in enumerate(self.history): + i = (i + 1) % 2 + if i == player: + if hist[0] == "b": + player_game_stage_total = int(hist[1:]) + elif hist == "k": + if stage_i == 0: # preflop, checking means 2 + player_game_stage_total = 2 + else: + player_game_stage_total = 0 + elif hist == "c": # Call the previous bet + # Exception for when you can call the big blind on the preflop, without the big blind having bet previously + if hist_idx == 2: + player_game_stage_total = 2 + else: + player_game_stage_total = int(self.history[hist_idx - 1][1:]) + + if hist == "/": + stage_i += 1 + player_total += player_game_stage_total + player_game_stage_total = 0 + if stage_i == 1: + i = ( + i + 1 + ) % 2 # We need to flip the order post-flop, as the BB is the one who acts first now + + return player_total + + def get_remaining_balance(self, player: Player): + # Each player starts with a balance of 100 at the beginning of each hand + return 100 - self.calculate_player_total_up_to_game_stage(player) + + def game_stage_ended(self): + # TODO: Make sure this logic is good + current_game_stage_history, stage = self.get_current_game_stage_history() + if len(current_game_stage_history) == 0: + return False + elif current_game_stage_history[-1] == "f": + return True + elif ( + current_game_stage_history[-1] == "c" and len(self.history) > 3 + ): # On pre-flop, when the small blind calls, the opponent can still bet + return True + elif len(current_game_stage_history) >= 2 and current_game_stage_history[-2:] == ["k", "k"]: + return True + else: + return False + + def player(self): + """ + This part is confusing for heads-up no limit poker, because the player that acts first changes: + The Small Blind (SB) acts first pre-flop, but the Big Blind (BB) acts first post-flop. + 1. ['AkTh', 'QdKd', 'b2', 'c', '/', 'Qh', 'b2', 'c', '/', '2d', b2', 'f'] + SB BB BB SB BB SB + """ + if len(self.history) <= 1: + return -1 + elif self.game_stage_ended(): + return -1 + elif self.history[-1] == "/": + return -1 + else: + if "/" in self.history: + return (len(self.history) + 1) % 2 # Order is flipped post-flop + else: + return len(self.history) % 2 + + def is_chance(self): + return super().is_chance() + + def sample_chance_outcome(self): + assert self.is_chance() + + cards = self.actions() # Will be either or cards not seen in the deck or ['/'] + + if len(self.history) <= 1: # We need to deal two cards to each player + cards = random.sample(cards, 2) + return "".join(cards) + else: + return random.choice(cards) # Sample one of the community cards with equal probability + + def terminal_utility(self, i: Player) -> int: + assert self.is_terminal() # We can only call the utility for a terminal history + assert i in [0, 1] # Only works for 2 player games for now + + actions = ["k", "b1", "b2", "b4", "b8", "all-in", "c", "f"] + pot_size = 0 + # These represent the bets in the current game stage, i.e. pre-flop, flop, turn, river + prev_bet = 1 # small blind starting value + curr_bet = 2 # big blind starting value + for i, action in enumerate(self.history): + if action == "/": # Move on to next stage + assert curr_bet == prev_bet and curr_bet == 0 + pot_size += curr_bet + prev_bet = 0 + + if action not in actions: + continue + + if action == "k": + assert curr_bet == prev_bet and curr_bet == 0 + + elif action == "b1": + assert curr_bet == 0 + curr_bet = 1 + + elif action == "b2": + if curr_bet == 0: + assert prev_bet == 0 + curr_bet = 2 + else: + prev_bet = curr_bet + curr_bet *= 2 + elif action == "b4": + if curr_bet == 0: + assert prev_bet == 0 + curr_bet = 4 + else: + prev_bet = curr_bet + curr_bet *= 4 + elif action == "b8": + if curr_bet == 0: + assert prev_bet == 0 + curr_bet = 8 + else: + prev_bet == curr_bet + curr_bet *= 8 + + elif action == "all-in": + curr_bet = 100 - pot_size - curr_bet # Maximum, since each player has 100 chips + + elif action == "c": + assert curr_bet != 0 + pot_size += 2 * curr_bet + curr_bet = 0 + prev_bet = 0 + + elif action == "f": + assert i == len(self.history) - 1 # Folding should be the last action + + pot_size += prev_bet + pot_size += curr_bet + + else: + raise Exception("Action not recognized") + + # Now that we know how much we won from the pot, we also we to calculate how much we made ourselves + + def __add__(self, action: Action): + new_history = HoldEmHistory(self.history + [action]) + return new_history + + def get_infoSet_key(self, kmeans_flop, kmeans_turn, kmeans_river) -> List[Action]: + assert not self.is_chance() + assert not self.is_terminal() + + player = self.player() + history = copy.deepcopy(self.history) + print(history) + + # ----- Assign cluster ID for PREFLOP ----- + player_cards = [] + if player == 0: + player_cards = history[0] + history[0] = get_preflop_cluster_id(history[0]) + history[1] = "?" + else: + player_cards = history[1] + history[0] = "?" + history[1] = get_preflop_cluster_id(history[1]) + + # ----- Assign cluster ID for FLOP/TURN/RIVER ----- + community_cards = "" + new_stage = False + stage_i = 0 + for i, action in enumerate(history): + if new_stage: + new_stage = False + if stage_i == 1: + assert len(action) == 6 + community_cards += action + history[i] = get_flop_cluster_id(kmeans_flop, player_cards + community_cards) + + elif stage_i == 2: + assert len(action) == 2 + community_cards += action + history[i] = get_turn_cluster_id(kmeans_turn, player_cards + community_cards) + elif stage_i == 3: + assert len(action) == 2 + community_cards += action + history[i] = get_river_cluster_id(kmeans_river, player_cards + community_cards) + elif action == "/": + new_stage = True + stage_i += 1 + + return history class HoldemInfoSet(base.InfoSet): - """ - Information Sets (InfoSets) cannot be chance histories, nor terminal histories. - This condition is checked when infosets are created. - - This infoset is an abstracted versions of the history in this case. - See the `get_infoSet_key(self)` function for these - - There are 2 abstractions we are doing: - 1. Card Abstraction (grouping together similar hands) - 2. Action Abstraction - - I've imported my abstractions from `abstraction.py`. - - """ - def __init__(self, infoSet: List[Action], actions: List[Action], player: Player): - assert(len(infoSet) >= 2) - super().__init__(infoSet, actions, player) - + """ + Information Sets (InfoSets) cannot be chance histories, nor terminal histories. + This condition is checked when infosets are created. + + This infoset is an abstracted versions of the history in this case. + See the `get_infoSet_key(self)` function for these + + There are 2 abstractions we are doing: + 1. Card Abstraction (grouping together similar hands) + 2. Action Abstraction + + I've imported my abstractions from `abstraction.py`. + + """ + + def __init__(self, infoSet: List[Action], actions: List[Action], player: Player): + assert len(infoSet) >= 2 + super().__init__(infoSet, actions, player) + def create_infoSet(infoSet_key: List[Action], actions: List[Action], player: Player): - """ - We create an information set from a history. - """ - return HoldemInfoSet(infoSet_key, actions, player) - - + """ + We create an information set from a history. + """ + return HoldemInfoSet(infoSet_key, actions, player) + + def create_history(): - return HoldEmHistory() - - + return HoldEmHistory() + + # CFR with abstraction integrated class HoldemAbstractCFR(base.CFR): - def __init__(self, create_infoSet, create_history, kmeans_flop, kmeans_turn, kmeans_river, n_players: int = 2, iterations: int = 1000000,): - super().__init__(create_infoSet, create_history, n_players, iterations) - + def __init__( + self, + create_infoSet, + create_history, + kmeans_flop, + kmeans_turn, + kmeans_river, + n_players: int = 2, + iterations: int = 1000000, + ): + super().__init__(create_infoSet, create_history, n_players, iterations) + if __name__ == "__main__": - kmeans_flop, kmeans_turn, kmeans_river = load_kmeans_classifiers() - # cfr = HoldemAbstractCFR(create_infoSet, create_history) - # cfr.solve() - - """ + kmeans_flop, kmeans_turn, kmeans_river = load_kmeans_classifiers() + # cfr = HoldemAbstractCFR(create_infoSet, create_history) + # cfr.solve() + + """ When we work with these abstractions, we have two types: 1. Action Abstraction 2. Card Abstraction @@ -435,33 +458,27 @@ def __init__(self, create_infoSet, create_history, kmeans_flop, kmeans_turn, kme Both of these are implemented in a different way. """ - - - - - - - hist: HoldEmHistory = create_history() - assert(hist.player() == -1) - hist1 = hist + 'AkTh' - assert(hist1.player() == -1) - hist2 = hist1 + 'QdKd' - assert(hist2.player() == 0) - print(hist2.get_infoSet_key(kmeans_flop, kmeans_turn, kmeans_river)) - hist3 = hist2 + 'b2' - assert(hist3.player() == 1) - hist4 = hist3 + 'c' - assert(hist4.player() == -1) - # Below are chance events, so it doesn't matter which player it is - hist5 = hist4 + '/' - assert(hist5.player() == -1) - hist6 = hist5 + 'QhKsKh' - assert(hist6.player() == 1) - hist7 = hist6 + 'b1' - hist8: HoldEmHistory = hist7 + 'b3' - curr = time.time() - print(hist8.get_infoSet_key(kmeans_flop, kmeans_turn, kmeans_river), time.time() - curr) - - # cfr = base.CFR(create_infoSet, create_history) - # cfr.solve() \ No newline at end of file + hist: HoldEmHistory = create_history() + assert hist.player() == -1 + hist1 = hist + "AkTh" + assert hist1.player() == -1 + hist2 = hist1 + "QdKd" + assert hist2.player() == 0 + print(hist2.get_infoSet_key(kmeans_flop, kmeans_turn, kmeans_river)) + hist3 = hist2 + "b2" + assert hist3.player() == 1 + hist4 = hist3 + "c" + assert hist4.player() == -1 + # Below are chance events, so it doesn't matter which player it is + hist5 = hist4 + "/" + assert hist5.player() == -1 + hist6 = hist5 + "QhKsKh" + assert hist6.player() == 1 + hist7 = hist6 + "b1" + hist8: HoldEmHistory = hist7 + "b3" + curr = time.time() + print(hist8.get_infoSet_key(kmeans_flop, kmeans_turn, kmeans_river), time.time() - curr) + + # cfr = base.CFR(create_infoSet, create_history) + # cfr.solve() diff --git a/src/rps.py b/src/rps.py index 0a6dd27..64878ce 100644 --- a/src/rps.py +++ b/src/rps.py @@ -3,68 +3,71 @@ from typing import NewType, Dict, List, Callable, cast import copy + class RPSHistory(base.History): - def __init__(self, history: List[Action] = []): - self.history = history - - def is_terminal(self): - return len(self.history) == 2 - - def actions(self): - return ['R', 'P', 'S'] - - def player(self): - plays = len(self.history) - return plays % 2 - - def terminal_utility(self, i: Player) -> int: - assert(self.is_terminal()) - p1_choice = self.history[0] - p2_choice = self.history[1] - - p1_idx = "RPS".index(p1_choice) - p2_idx = "RPS".index(p2_choice) - - if p1_idx == p2_idx: - return 0 - elif (p1_idx + 1) % 3 == p2_idx: - return -1 if i == 0 else 1 - else: - return 1 if i == 0 else -1 - - def __add__(self, action: Action): - return RPSHistory(self.history + [action]) - - def get_infoSet_key(self) -> List[Action]: - history = copy.deepcopy(self.history) - if len(history) >= 1: - history[0] = '?' - return history + def __init__(self, history: List[Action] = []): + self.history = history + + def is_terminal(self): + return len(self.history) == 2 + + def actions(self): + return ["R", "P", "S"] + + def player(self): + plays = len(self.history) + return plays % 2 + + def terminal_utility(self, i: Player) -> int: + assert self.is_terminal() + p1_choice = self.history[0] + p2_choice = self.history[1] + + p1_idx = "RPS".index(p1_choice) + p2_idx = "RPS".index(p2_choice) + + if p1_idx == p2_idx: + return 0 + elif (p1_idx + 1) % 3 == p2_idx: + return -1 if i == 0 else 1 + else: + return 1 if i == 0 else -1 + + def __add__(self, action: Action): + return RPSHistory(self.history + [action]) + + def get_infoSet_key(self) -> List[Action]: + history = copy.deepcopy(self.history) + if len(history) >= 1: + history[0] = "?" + return history + class RPSInfoSet(base.InfoSet): - def __init__(self, infoSet: List[Action]): - super().__init__(infoSet) - - def actions(self) -> List[Action]: - return ['R', 'P', 'S'] - - def player(self) -> Player: - plays = len(self.infoSet) - return plays % 2 - + def __init__(self, infoSet: List[Action]): + super().__init__(infoSet) + + def actions(self) -> List[Action]: + return ["R", "P", "S"] + + def player(self) -> Player: + plays = len(self.infoSet) + return plays % 2 + + def create_infoSet(infoSet_key: List[Action]): - """ - We create an information set from a history. - """ - return RPSInfoSet(infoSet_key) - - + """ + We create an information set from a history. + """ + return RPSInfoSet(infoSet_key) + + def create_history(): - return RPSHistory() - + return RPSHistory() + if __name__ == "__main__": - cfr = base.CFR(create_infoSet, create_history, iterations=1_00_000) - # cfr.solve() - # print(cfr.get_expected_value(RPSHistory([]), 0, player_strategy=[1,0,0], opp_strategy=[0.5,0.5,0])) - print(cfr.get_best_response(RPSHistory([]), 0, player_strategy=[0.8,0.2,0])) \ No newline at end of file + cfr = base.CFR(create_infoSet, create_history, iterations=1_00_000) + # cfr.solve() + # print(cfr.get_expected_value(RPSHistory([]), 0, player_strategy=[1,0,0], opp_strategy=[0.5,0.5,0])) + print(cfr.get_best_response(RPSHistory([]), 0, player_strategy=[0.8, 0.2, 0])) diff --git a/src/table.py b/src/table.py index c490f41..3a8edc2 100644 --- a/src/table.py +++ b/src/table.py @@ -1,12 +1,13 @@ # Lookup table mapping an int -> position of "1" bit # Ex: 4 (100) -> 2 + def generate_table(): - TABLE = {} - # Create the table - val = 1 - for i in range(64): - TABLE[val] = i - val *= 2 - - return TABLE \ No newline at end of file + TABLE = {} + # Create the table + val = 1 + for i in range(64): + TABLE[val] = i + val *= 2 + + return TABLE diff --git a/src/treys_test.py b/src/treys_test.py index 77bd0cf..75b0377 100644 --- a/src/treys_test.py +++ b/src/treys_test.py @@ -13,10 +13,10 @@ # hand = deck.draw(2) score = evaluator.evaluate(hand, board) -print(score / 7462) # 7462 is the WORST POSSIBLE SCORE +print(score / 7462) # 7462 is the WORST POSSIBLE SCORE # cls = evaluator.get_rank_class(score) # print(cls) # print(evaluator.class_to_string(cls)) -evaluator.hand_summary(board, [hand1, hand2]) \ No newline at end of file +evaluator.hand_summary(board, [hand1, hand2]) diff --git a/src/utils.py b/src/utils.py index 9a03d93..ddaec61 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,41 +1,45 @@ """ Some old util code that might be useful in the future """ -import torch # To run K-means with GPU + +import torch # To run K-means with GPU + # Modified version of K-Means to add Earth Mover's Distance from here: https://github.com/subhadarship/kmeans_pytorch/blob/master/kmeans_pytorch/__init__.py # DEPRECATED, using the standard kmeans from scikit-learn since it suits our needs def initialize(X, n_clusters, seed): - """ - initialize cluster centers - :param X: (torch.tensor) matrix - :param n_clusters: (int) number of clusters - :param seed: (int) seed for kmeans - :return: (np.array) initial state - """ - num_samples = len(X) - if seed == None: - indices = np.random.choice(num_samples, n_clusters, replace=False) - else: - np.random.seed(seed) ; indices = np.random.choice(num_samples, n_clusters, replace=False) - initial_state = X[indices] - return initial_state + """ + initialize cluster centers + :param X: (torch.tensor) matrix + :param n_clusters: (int) number of clusters + :param seed: (int) seed for kmeans + :return: (np.array) initial state + """ + num_samples = len(X) + if seed == None: + indices = np.random.choice(num_samples, n_clusters, replace=False) + else: + np.random.seed(seed) + indices = np.random.choice(num_samples, n_clusters, replace=False) + initial_state = X[indices] + return initial_state + def kmeans_custom( - X, - n_clusters, - distance='euclidean', - centroids=[], - tol=1e-3, - tqdm_flag=True, - iter_limit=0, - device=torch.device('cuda' if torch.cuda.is_available() else 'cpu'), - seed=None, + X, + n_clusters, + distance="euclidean", + centroids=[], + tol=1e-3, + tqdm_flag=True, + iter_limit=0, + device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), + seed=None, ): - """ - perform kmeans n_init, default=10 - Number of time the k-means algorithm will be run with different centroid seeds. The final results will be the best output of n_init consecutive runs in terms of inertia. - :param X: (torch.tensor) matrix + """ + perform kmeans n_init, default=10 + Number of time the k-means algorithm will be run with different centroid seeds. The final results will be the best output of n_init consecutive runs in terms of inertia. + :param X: (torch.tensor) matrix :param num_clusters: (int) number of clusters :param distance: (str) distance [options: 'euclidean', 'cosine'] [default: 'euclidean'] :param seed: (int) seed for kmeans @@ -44,218 +48,230 @@ def kmeans_custom( :param tqdm_flag: Allows to turn logs on and off :param iter_limit: hard limit for max number of iterations :param gamma_for_soft_dtw: approaches to (hard) DTW as gamma -> 0 - Return - X_cluster_ids (torch.tensor), centroids (torch.tensor) - """ - if tqdm_flag: - print(f'running k-means on {device}..') - - if distance == 'euclidean': - pairwise_distance_function = partial(pairwise_distance, device=device, tqdm_flag=tqdm_flag) - elif distance == 'cosine': - pairwise_distance_function = partial(pairwise_cosine, device=device) - elif distance == 'EMD': - pairwise_distance_function = partial(pairwise_EMD, device=device) - - else: - raise NotImplementedError - - if type(X) != torch.Tensor: - X = torch.tensor(X) - # convert to float - X = X.float() - - # transfer to device - X = X.to(device) - - # initialize - if type(centroids) == list: # ToDo: make this less annoyingly weird - initial_state = initialize(X, n_clusters, seed=seed) - else: - if tqdm_flag: - print('resuming') - # find data point closest to the initial cluster center - initial_state = centroids - dis = pairwise_distance_function(X, initial_state) - choice_points = torch.argmin(dis, dim=0) - initial_state = X[choice_points] - initial_state = initial_state.to(device) - - iteration = 0 - if tqdm_flag: - tqdm_meter = tqdm(desc='[running kmeans]') - - while True: - dis = pairwise_distance_function(X, initial_state) - - choice_cluster = torch.argmin(dis, dim=1) - - initial_state_pre = initial_state.clone() - - for index in range(n_clusters): - selected = torch.nonzero(choice_cluster == index).squeeze().to(device) - - selected = torch.index_select(X, 0, selected) - - # https://github.com/subhadarship/kmeans_pytorch/issues/16 - if selected.shape[0] == 0: - selected = X[torch.randint(len(X), (1,))] - - initial_state[index] = selected.mean(dim=0) - - center_shift = torch.sum( - torch.sqrt( - torch.sum((initial_state - initial_state_pre) ** 2, dim=1) - )) - - # increment iteration - iteration = iteration + 1 - - # update tqdm meter - if tqdm_flag: - tqdm_meter.set_postfix( - iteration=f'{iteration}', - center_shift=f'{center_shift ** 2:0.6f}', - tol=f'{tol:0.6f}' - ) - tqdm_meter.update() - if center_shift ** 2 < tol: - break - if iter_limit != 0 and iteration >= iter_limit: - break - - return choice_cluster.cpu(), initial_state.cpu() # clusters_indices_on_initial data, final_centroids + Return + X_cluster_ids (torch.tensor), centroids (torch.tensor) + """ + if tqdm_flag: + print(f"running k-means on {device}..") + + if distance == "euclidean": + pairwise_distance_function = partial(pairwise_distance, device=device, tqdm_flag=tqdm_flag) + elif distance == "cosine": + pairwise_distance_function = partial(pairwise_cosine, device=device) + elif distance == "EMD": + pairwise_distance_function = partial(pairwise_EMD, device=device) + + else: + raise NotImplementedError + + if type(X) != torch.Tensor: + X = torch.tensor(X) + # convert to float + X = X.float() + + # transfer to device + X = X.to(device) + + # initialize + if type(centroids) == list: # ToDo: make this less annoyingly weird + initial_state = initialize(X, n_clusters, seed=seed) + else: + if tqdm_flag: + print("resuming") + # find data point closest to the initial cluster center + initial_state = centroids + dis = pairwise_distance_function(X, initial_state) + choice_points = torch.argmin(dis, dim=0) + initial_state = X[choice_points] + initial_state = initial_state.to(device) + + iteration = 0 + if tqdm_flag: + tqdm_meter = tqdm(desc="[running kmeans]") + + while True: + dis = pairwise_distance_function(X, initial_state) + + choice_cluster = torch.argmin(dis, dim=1) + + initial_state_pre = initial_state.clone() + + for index in range(n_clusters): + selected = torch.nonzero(choice_cluster == index).squeeze().to(device) + + selected = torch.index_select(X, 0, selected) + + # https://github.com/subhadarship/kmeans_pytorch/issues/16 + if selected.shape[0] == 0: + selected = X[torch.randint(len(X), (1,))] + + initial_state[index] = selected.mean(dim=0) + + center_shift = torch.sum( + torch.sqrt(torch.sum((initial_state - initial_state_pre) ** 2, dim=1)) + ) + + # increment iteration + iteration = iteration + 1 + + # update tqdm meter + if tqdm_flag: + tqdm_meter.set_postfix( + iteration=f"{iteration}", + center_shift=f"{center_shift ** 2:0.6f}", + tol=f"{tol:0.6f}", + ) + tqdm_meter.update() + if center_shift**2 < tol: + break + if iter_limit != 0 and iteration >= iter_limit: + break + + return ( + choice_cluster.cpu(), + initial_state.cpu(), + ) # clusters_indices_on_initial data, final_centroids + def kmeans_custom_predict( - X, - centroids, - distance='euclidean', - device=torch.device('cuda' if torch.cuda.is_available() else 'cpu'), - tqdm_flag=True + X, + centroids, + distance="euclidean", + device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), + tqdm_flag=True, +): + """ + Return + cluster_ids_on_X (torch.tensor) + + """ + if tqdm_flag: + print(f"predicting on {device}..") + + if distance == "euclidean": + pairwise_distance_function = partial(pairwise_distance, device=device, tqdm_flag=tqdm_flag) + elif distance == "cosine": + pairwise_distance_function = partial(pairwise_cosine, device=device) + elif distance == "EMD": + pairwise_distance_function = partial(pairwise_EMD, device=device) + else: + raise NotImplementedError + + # convert to float + if type(X) != torch.Tensor: + X = torch.tensor(X) + X = X.float() + + # transfer to device + X = X.to(device) + + dis = pairwise_distance_function(X, centroids) + if len(dis.shape) == 1: # Prediction on a single data + choice_cluster = torch.argmin(dis) + + else: + choice_cluster = torch.argmin(dis, dim=1) + + return choice_cluster.cpu() + + +def pairwise_distance( + data1, + data2, + device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), + tqdm_flag=True, ): - """ - Return - cluster_ids_on_X (torch.tensor) - - """ - if tqdm_flag: - print(f'predicting on {device}..') - - if distance == 'euclidean': - pairwise_distance_function = partial(pairwise_distance, device=device, tqdm_flag=tqdm_flag) - elif distance == 'cosine': - pairwise_distance_function = partial(pairwise_cosine, device=device) - elif distance == 'EMD': - pairwise_distance_function = partial(pairwise_EMD, device=device) - else: - raise NotImplementedError - - # convert to float - if type(X) != torch.Tensor: - X = torch.tensor(X) - X = X.float() - - # transfer to device - X = X.to(device) - - dis = pairwise_distance_function(X, centroids) - if (len(dis.shape) == 1): # Prediction on a single data - choice_cluster = torch.argmin(dis) - - else: - choice_cluster = torch.argmin(dis, dim=1) - - return choice_cluster.cpu() - - -def pairwise_distance(data1, data2, device=torch.device('cuda' if torch.cuda.is_available() else 'cpu'), tqdm_flag=True): - if tqdm_flag: - print(f'device is :{device}') - - # transfer to device - data1, data2 = data1.to(device), data2.to(device) - - A = data1.unsqueeze(dim=1) # N*1*M - B = data2.unsqueeze(dim=0) # 1*N*M - - dis = (A - B) ** 2.0 - # return N*N matrix for pairwise distance - dis = dis.sum(dim=-1).squeeze() - return dis - - -def pairwise_cosine(data1, data2, device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')): - # transfer to device - data1, data2 = data1.to(device), data2.to(device) - - A = data1.unsqueeze(dim=1) # N*1*M - B = data2.unsqueeze(dim=0) # 1*N*M - - # normalize the points | [0.3, 0.4] -> [0.3/sqrt(0.09 + 0.16), 0.4/sqrt(0.09 + 0.16)] = [0.3/0.5, 0.4/0.5] - A_normalized = A / A.norm(dim=-1, keepdim=True) - B_normalized = B / B.norm(dim=-1, keepdim=True) - - cosine = A_normalized * B_normalized - - # return N*N matrix for pairwise distance - cosine_dis = 1 - cosine.sum(dim=-1).squeeze() - return cosine_dis - - -def pairwise_EMD(data1, data2, device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')): - assert(len(data1.shape) == 2) - assert(len(data2.shape) == 2) - assert(data1.shape[1] == data2.shape[1]) - n = data1.shape[1] - pos_a = torch.tensor([[i] for i in range(n)]) - pos_b = torch.tensor([[i] for i in range(n)]) - C = ot.dist(pos_a, pos_b, metric='euclidean') - C.to(device) - - # Correct solution, but very slow - dist = torch.zeros((data1.shape[0], data2.shape[0])) - for i, hist_a in enumerate(data1): - for j, hist_b in enumerate(data2): - for _ in range(10): # Janky fix for small precision error - try: - ot_emd = ot.emd(hist_a, hist_b, C, numThreads="max") # The precision is set to 7, so sometimes the sum doesn't get to precisely 1. - break - except Exception as e: - print(e) - continue - - transport_cost_matrix = ot_emd * C - dist[i][j] = transport_cost_matrix.sum() - - return dist + if tqdm_flag: + print(f"device is :{device}") + + # transfer to device + data1, data2 = data1.to(device), data2.to(device) + + A = data1.unsqueeze(dim=1) # N*1*M + B = data2.unsqueeze(dim=0) # 1*N*M + + dis = (A - B) ** 2.0 + # return N*N matrix for pairwise distance + dis = dis.sum(dim=-1).squeeze() + return dis + + +def pairwise_cosine( + data1, data2, device=torch.device("cuda" if torch.cuda.is_available() else "cpu") +): + # transfer to device + data1, data2 = data1.to(device), data2.to(device) + + A = data1.unsqueeze(dim=1) # N*1*M + B = data2.unsqueeze(dim=0) # 1*N*M + + # normalize the points | [0.3, 0.4] -> [0.3/sqrt(0.09 + 0.16), 0.4/sqrt(0.09 + 0.16)] = [0.3/0.5, 0.4/0.5] + A_normalized = A / A.norm(dim=-1, keepdim=True) + B_normalized = B / B.norm(dim=-1, keepdim=True) + + cosine = A_normalized * B_normalized + + # return N*N matrix for pairwise distance + cosine_dis = 1 - cosine.sum(dim=-1).squeeze() + return cosine_dis -def kmeans_search(X): - """ - We can check for the quality of clustering by checking the inter-cluster distance, using the - same metric that we used for EMD. - - At some point, there is no point in increasing the number of clusters, since we don't really - get more information. - """ - # Search for the optimal number of clusters through a grid like search - - if type(X) != torch.Tensor: - X = torch.tensor(X) - # convert to float - X = X.float() - - # n_clusters = [10, 25, 50, 100, 200, 1000, 5000] - n_clusters = [5000] - for n_cluster in n_clusters: - cluster_indices, centroids = kmeans(X, n_cluster) - X_cluster_centroids = centroids[cluster_indices] - distances = 0 - for i, X_cluster_centroid in enumerate(X_cluster_centroids): - distances += pairwise_distance(torch.unsqueeze(X_cluster_centroid, axis=0), torch.unsqueeze(X[i], axis=0), tqdm_flag=False) - print(f"Sum of cluster to data distance {distances}") - print(f"Mean cluster to data distance {distances / X_cluster_centroids.shape[0]}") - - - +def pairwise_EMD(data1, data2, device=torch.device("cuda" if torch.cuda.is_available() else "cpu")): + assert len(data1.shape) == 2 + assert len(data2.shape) == 2 + assert data1.shape[1] == data2.shape[1] + n = data1.shape[1] + pos_a = torch.tensor([[i] for i in range(n)]) + pos_b = torch.tensor([[i] for i in range(n)]) + C = ot.dist(pos_a, pos_b, metric="euclidean") + C.to(device) + # Correct solution, but very slow + dist = torch.zeros((data1.shape[0], data2.shape[0])) + for i, hist_a in enumerate(data1): + for j, hist_b in enumerate(data2): + for _ in range(10): # Janky fix for small precision error + try: + ot_emd = ot.emd( + hist_a, hist_b, C, numThreads="max" + ) # The precision is set to 7, so sometimes the sum doesn't get to precisely 1. + break + except Exception as e: + print(e) + continue + + transport_cost_matrix = ot_emd * C + dist[i][j] = transport_cost_matrix.sum() + + return dist + + +def kmeans_search(X): + """ + We can check for the quality of clustering by checking the inter-cluster distance, using the + same metric that we used for EMD. + + At some point, there is no point in increasing the number of clusters, since we don't really + get more information. + """ + # Search for the optimal number of clusters through a grid like search + + if type(X) != torch.Tensor: + X = torch.tensor(X) + # convert to float + X = X.float() + + # n_clusters = [10, 25, 50, 100, 200, 1000, 5000] + n_clusters = [5000] + for n_cluster in n_clusters: + cluster_indices, centroids = kmeans(X, n_cluster) + X_cluster_centroids = centroids[cluster_indices] + distances = 0 + for i, X_cluster_centroid in enumerate(X_cluster_centroids): + distances += pairwise_distance( + torch.unsqueeze(X_cluster_centroid, axis=0), + torch.unsqueeze(X[i], axis=0), + tqdm_flag=False, + ) + print(f"Sum of cluster to data distance {distances}") + print(f"Mean cluster to data distance {distances / X_cluster_centroids.shape[0]}")