import mlagents_envs from mlagents_envs.base_env import ActionTuple from mlagents_envs.environment import UnityEnvironment import numpy as np class makeEnv(object): def __init__(self,envPath,workerID,basePort): self.env = UnityEnvironment(file_name=envPath,seed = 1,side_channels=[],worker_id = workerID,base_port=basePort) self.env.reset() # get enviroment specs self.LOAD_DIR_SIZE_IN_STATE = 2 self.TRACKED_AGENT = -1 self.BEHA_SPECS = self.env.behavior_specs self.BEHA_NAME = list(self.BEHA_SPECS)[0] self.SPEC = self.BEHA_SPECS[self.BEHA_NAME] self.OBSERVATION_SPECS = self.SPEC.observation_specs[0] # observation spec self.ACTION_SPEC = self.SPEC.action_spec # action specs self.DISCRETE_SIZE = self.ACTION_SPEC.discrete_size# 連続的な動作のSize self.CONTINUOUS_SIZE = self.ACTION_SPEC.continuous_size# 離散的な動作のSize self.STATE_SIZE = self.OBSERVATION_SPECS.shape[0] - self.LOAD_DIR_SIZE_IN_STATE# 環境観測データ数 print("√√√√√Enviroment Initialized Success√√√√√") def step(self,discreteActions = None,continuousActions = None,behaviorName = None,trackedAgent = None): # take action to enviroment # return mextState,reward,done # check if arg is include None or IS None try: isDisNone = discreteActions.any() == None if discreteActions.all() == None: print("step() Error!:discreteActions include None") except: isDisNone = True try: isConNone = continuousActions.any() == None if continuousActions.all() == None: print("step() Error!:continuousActions include None") except: isConNone = True if isDisNone: # if discreteActions is enpty just give nothing[[0]] to Enviroment discreteActions = np.array([[0]], dtype=np.int) if isConNone: # if continuousActions is enpty just give nothing[[0]] to Enviroment continuousActions = np.array([[0]], dtype=np.float) if behaviorName == None: behaviorName = self.BEHA_NAME if trackedAgent == None: trackedAgent = self.TRACKED_AGENT #create actionTuple thisActionTuple = ActionTuple(continuous=continuousActions,discrete=discreteActions) # take action to env self.env.set_actions(behavior_name=behaviorName,action=thisActionTuple) self.env.step() # get nextState & reward & done after this action nextState,reward,done,loadDir = self.getSteps(behaviorName,trackedAgent) return nextState,reward,done,loadDir def getSteps(self,behaviorName = None,trackedAgent = None): # get nextState & reward & done if behaviorName == None: behaviorName = self.BEHA_NAME decisionSteps,terminalSteps = self.env.get_steps(behaviorName) if self.TRACKED_AGENT == -1 and len(decisionSteps) >= 1: self.TRACKED_AGENT = decisionSteps.agent_id[0] if trackedAgent == None: trackedAgent = self.TRACKED_AGENT if trackedAgent in decisionSteps: # ゲーム終了していない場合、環境状態がdecision_stepsに保存される nextState = decisionSteps[trackedAgent].obs[0] nextState = np.reshape(nextState,[1,self.STATE_SIZE+self.LOAD_DIR_SIZE_IN_STATE]) loadDir = nextState[0][-2:] nextState = nextState[0][:-2] reward = decisionSteps[trackedAgent].reward done = False if trackedAgent in terminalSteps: # ゲーム終了した場合、環境状態がterminal_stepsに保存される nextState = terminalSteps[trackedAgent].obs[0] nextState = np.reshape(nextState,[1,self.STATE_SIZE+self.LOAD_DIR_SIZE_IN_STATE]) loadDir = nextState[0][-2:] nextState = nextState[0][:-2] reward = terminalSteps[trackedAgent].reward done = True return nextState, reward, done, loadDir def reset(self): self.env.reset() nextState,reward,done,loadDir = self.getSteps() return nextState,reward,done,loadDir def render(self): self.env.render()