import mlagents_envs from mlagents_envs.base_env import ActionTuple from mlagents_envs.environment import UnityEnvironment import numpy as np class makeEnv(object): def __init__(self, envPath, workerID, basePort): self.env = UnityEnvironment( file_name=envPath, seed=1, side_channels=[], worker_id=workerID, base_port=basePort, ) self.env.reset() # get enviroment specs self.LOAD_DIR_SIZE_IN_STATE = 3 self.TRACKED_AGENT = -1 self.BEHA_SPECS = self.env.behavior_specs self.BEHA_NAME = list(self.BEHA_SPECS)[0] self.SPEC = self.BEHA_SPECS[self.BEHA_NAME] self.OBSERVATION_SPECS = self.SPEC.observation_specs[ 0 ] # observation spec self.ACTION_SPEC = self.SPEC.action_spec # action specs self.DISCRETE_SIZE = self.ACTION_SPEC.discrete_size #  連続的な動作のSize self.CONTINUOUS_SIZE = self.ACTION_SPEC.continuous_size #  離散的な動作のSize self.STATE_SIZE = ( self.OBSERVATION_SPECS.shape[0] - self.LOAD_DIR_SIZE_IN_STATE ) # 環境観測データ数 print("√√√√√Enviroment Initialized Success√√√√√") def step( self, discreteActions=None, continuousActions=None, behaviorName=None, trackedAgent=None, ): # take action to enviroment # return mextState,reward,done # check if arg is include None or IS None try: isDisNone = discreteActions.any() is None if discreteActions.all() is None: print("step() Error!:discreteActions include None") except: isDisNone = True try: isConNone = continuousActions.any() is None if continuousActions.all() is None: print("step() Error!:continuousActions include None") except: isConNone = True if isDisNone: # if discreteActions is enpty just give nothing[[0]] to Enviroment discreteActions = np.array([[0]], dtype=np.int) if isConNone: # if continuousActions is enpty just give nothing[[0]] to Enviroment continuousActions = np.array([[0]], dtype=np.float) if behaviorName is None: behaviorName = self.BEHA_NAME if trackedAgent is None: trackedAgent = self.TRACKED_AGENT # create actionTuple thisActionTuple = ActionTuple( continuous=continuousActions, discrete=discreteActions ) # take action to env self.env.set_actions( behavior_name=behaviorName, action=thisActionTuple ) self.env.step() # get nextState & reward & done after this action nextState, reward, done, loadDir, saveNow = self.getSteps( behaviorName, trackedAgent ) return nextState, reward, done, loadDir, saveNow def getSteps(self, behaviorName=None, trackedAgent=None): # get nextState & reward & done if behaviorName is None: behaviorName = self.BEHA_NAME decisionSteps, terminalSteps = self.env.get_steps(behaviorName) if self.TRACKED_AGENT == -1 and len(decisionSteps) >= 1: self.TRACKED_AGENT = decisionSteps.agent_id[0] if trackedAgent is None: trackedAgent = self.TRACKED_AGENT if ( trackedAgent in decisionSteps ): # ゲーム終了していない場合、環境状態がdecision_stepsに保存される nextState = decisionSteps[trackedAgent].obs[0] nextState = np.reshape( nextState, [1, self.STATE_SIZE + self.LOAD_DIR_SIZE_IN_STATE] ) saveNow = nextState[0][-1] loadDir = nextState[0][-3:-1] nextState = nextState[0][:-3] reward = decisionSteps[trackedAgent].reward done = False if ( trackedAgent in terminalSteps ): # ゲーム終了した場合、環境状態がterminal_stepsに保存される nextState = terminalSteps[trackedAgent].obs[0] nextState = np.reshape( nextState, [1, self.STATE_SIZE + self.LOAD_DIR_SIZE_IN_STATE] ) saveNow = nextState[0][-1] loadDir = nextState[0][-3:-1] nextState = nextState[0][:-3] reward = terminalSteps[trackedAgent].reward done = True return nextState, reward, done, loadDir, saveNow def reset(self): self.env.reset() nextState, reward, done, loadDir, saveNow = self.getSteps() return nextState, reward, done, loadDir, saveNow def render(self): self.env.render()