import gym import numpy as np import uuid import airecorder from numpy import ndarray from mlagents_envs.base_env import ActionTuple from mlagents_envs.environment import UnityEnvironment from typing import Tuple, List from mlagents_envs.side_channel.side_channel import ( SideChannel, IncomingMessage, OutgoingMessage, ) class Aimbot(gym.Env): def __init__( self, envPath: str, workerID: int = 1, basePort: int = 100, side_channels: list = [] ): super(Aimbot, self).__init__() self.env = UnityEnvironment( file_name=envPath, seed=1, side_channels=side_channels, worker_id=workerID, base_port=basePort, ) self.env.reset() # all behavior_specs self.unity_specs = self.env.behavior_specs # environment behavior name self.unity_beha_name = list(self.unity_specs)[0] # environment behavior spec self.unity_specs = self.unity_specs[self.unity_beha_name] # environment observation_space self.unity_obs_specs = self.unity_specs.observation_specs[0] # environment action specs self.unity_action_spec = self.unity_specs.action_spec # environment sample observation decisionSteps, _ = self.env.get_steps(self.unity_beha_name) # OBSERVATION SPECS # environment state shape. like tuple:(93,) self.unity_observation_shape = self.unity_obs_specs.shape # ACTION SPECS # environment continuous action number. int self.unity_continuous_size = self.unity_action_spec.continuous_size # environment discrete action shapes. list (3,3,2) self.unity_discrete_branches = self.unity_action_spec.discrete_branches # environment discrete action type. int 3 self.unity_discrete_type = self.unity_action_spec.discrete_size # environment discrete action type. int 3+3+2=8 self.unity_discrete_size = sum(self.unity_discrete_branches) # environment total action size. int 3+2=5 self.unity_action_size = self.unity_discrete_type + self.unity_continuous_size # ActionExistBool self.unity_dis_act_exist = self.unity_discrete_type != 0 self.unity_con_act_exist = self.unity_continuous_size != 0 # AGENT SPECS # all agents ID self.unity_agent_IDS = decisionSteps.agent_id # agents number self.unity_agent_num = len(self.unity_agent_IDS) def reset(self)->Tuple[np.ndarray, List, List]: """reset enviroment and get observations Returns: ndarray: nextState, reward, done, loadDir, saveNow """ # reset env self.env.reset() nextState, reward, done = self.get_steps() return nextState, reward, done # TODO: # delete all stack state DONE # getstep State disassembly function DONE # delete agent selection function DONE # self.step action wrapper function DONE def step( self, actions: ndarray, )->Tuple[np.ndarray, List, List]: """change ations list to ActionTuple then send it to enviroment Args: actions (ndarray): PPO chooseAction output action list.(agentNum,actionNum) Returns: ndarray: nextState, reward, done """ # take action to enviroment # return mextState,reward,done # discrete action if self.unity_dis_act_exist: # create discrete action from actions list discreteActions = actions[:, 0 : self.unity_discrete_type] else: # create empty discrete action discreteActions = np.asarray([[0]]) # continuous action if self.unity_con_act_exist: # create continuous actions from actions list continuousActions = actions[:, self.unity_discrete_type :] else: # create empty continuous action continuousActions = np.asanyarray([[0.0]]) # Dummy continuous action # continuousActions = np.asanyarray([[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]]) # create actionTuple thisActionTuple = ActionTuple(continuous=continuousActions, discrete=discreteActions) # take action to env self.env.set_actions(behavior_name=self.unity_beha_name, action=thisActionTuple) self.env.step() # get nextState & reward & done after this action nextStates, rewards, dones = self.get_steps() return nextStates, rewards, dones def get_steps(self)->Tuple[np.ndarray, List, List]: """get enviroment now observations. Include State, Reward, Done Args: Returns: ndarray: nextState, reward, done """ # get nextState & reward & done decision_steps, terminal_steps = self.env.get_steps(self.unity_beha_name) next_states = [] dones = [] rewards = [] for this_agent_ID in self.unity_agent_IDS: # while Episode over agentID will both in decisionSteps and terminalSteps. # avoid redundant state and reward, # use agentExist toggle to check if agent is already exist. agent_exist = False # game done if this_agent_ID in terminal_steps: next_states.append(terminal_steps[this_agent_ID].obs[0]) dones.append(True) rewards.append(terminal_steps[this_agent_ID].reward) agent_exist = True # game not over yet and agent not in terminalSteps if (this_agent_ID in decision_steps) and (not agent_exist): next_states.append(decision_steps[this_agent_ID].obs[0]) dones.append(False) rewards.append(decision_steps[this_agent_ID].reward) return np.asarray(next_states), rewards, dones def close(self): self.env.close() class AimbotSideChannel(SideChannel): def __init__(self, channel_id: uuid.UUID) -> None: super().__init__(channel_id) def on_message_received(self, msg: IncomingMessage) -> None: """ Note: We must implement this method of the SideChannel interface to receive messages from Unity Message will be sent like this: "Warning|Message1|Message2|Message3" or "Error|Message1|Message2|Message3" """ this_message = msg.read_string() this_result = this_message.split("|") if(this_result[0] == "result"): airecorder.total_rounds[this_result[1]]+=1 if(this_result[2] == "Win"): airecorder.win_rounds[this_result[1]]+=1 #print(TotalRounds) #print(WinRounds) elif(this_result[0] == "Error"): print(this_message) # # while Message type is Warning # if(thisResult[0] == "Warning"): # # while Message1 is result means one game is over # if (thisResult[1] == "Result"): # TotalRounds[thisResult[2]]+=1 # # while Message3 is Win means this agent win this game # if(thisResult[3] == "Win"): # WinRounds[thisResult[2]]+=1 # # while Message1 is GameState means this game is just start # # and tell python which game mode is # elif (thisResult[1] == "GameState"): # SCrecieved = 1 # # while Message type is Error # elif(thisResult[0] == "Error"): # print(thisMessage) # 发送函数 def send_string(self, data: str) -> None: # send a string toC# msg = OutgoingMessage() msg.write_string(data) super().queue_message_to_send(msg) def send_bool(self, data: bool) -> None: msg = OutgoingMessage() msg.write_bool(data) super().queue_message_to_send(msg) def send_int(self, data: int) -> None: msg = OutgoingMessage() msg.write_int32(data) super().queue_message_to_send(msg) def send_float(self, data: float) -> None: msg = OutgoingMessage() msg.write_float32(data) super().queue_message_to_send(msg) def send_float_list(self, data: List[float]) -> None: msg = OutgoingMessage() msg.write_float32_list(data) super().queue_message_to_send(msg)