2022-09-05 11:46:08 +00:00
import mlagents_envs
from mlagents_envs.base_env import ActionTuple
from mlagents_envs.environment import UnityEnvironment
import numpy as np
class makeEnv(object):
def __init__(self,envPath,workerID,basePort):
self.env = UnityEnvironment(file_name=envPath,seed = 1,side_channels=[],worker_id = workerID,base_port=basePort)
# get enviroment specs
2022-09-05 12:22:34 +00:00
2022-09-05 11:46:08 +00:00
self.BEHA_SPECS = self.env.behavior_specs
self.BEHA_NAME = list(self.BEHA_SPECS)[0]
self.SPEC = self.BEHA_SPECS[self.BEHA_NAME]
self.OBSERVATION_SPECS = self.SPEC.observation_specs[0] # observation spec
self.ACTION_SPEC = self.SPEC.action_spec # action specs
self.DISCRETE_SIZE = self.ACTION_SPEC.discrete_size# 連続的な動作のSize
self.CONTINUOUS_SIZE = self.ACTION_SPEC.continuous_size# 離散的な動作のSize
self.STATE_SIZE = self.OBSERVATION_SPECS.shape[0] - self.LOAD_DIR_SIZE_IN_STATE# 環境観測データ数
print("√√√√√Enviroment Initialized Success√√√√√")
def step(self,discreteActions = None,continuousActions = None,behaviorName = None,trackedAgent = None):
# take action to enviroment
# return mextState,reward,done
# check if arg is include None or IS None
isDisNone = discreteActions.any() == None
if discreteActions.all() == None:
print("step() Error!:discreteActions include None")
isDisNone = True
isConNone = continuousActions.any() == None
if continuousActions.all() == None:
print("step() Error!:continuousActions include None")
isConNone = True
if isDisNone:
# if discreteActions is enpty just give nothing[[0]] to Enviroment
discreteActions = np.array([[0]], dtype=np.int)
if isConNone:
# if continuousActions is enpty just give nothing[[0]] to Enviroment
continuousActions = np.array([[0]], dtype=np.float)
if behaviorName == None:
behaviorName = self.BEHA_NAME
if trackedAgent == None:
trackedAgent = self.TRACKED_AGENT
#create actionTuple
thisActionTuple = ActionTuple(continuous=continuousActions,discrete=discreteActions)
# take action to env
# get nextState & reward & done after this action
2022-09-05 12:22:34 +00:00
nextState,reward,done,loadDir, saveNow = self.getSteps(behaviorName,trackedAgent)
return nextState,reward,done,loadDir, saveNow
2022-09-05 11:46:08 +00:00
def getSteps(self,behaviorName = None,trackedAgent = None):
# get nextState & reward & done
if behaviorName == None:
behaviorName = self.BEHA_NAME
decisionSteps,terminalSteps = self.env.get_steps(behaviorName)
if self.TRACKED_AGENT == -1 and len(decisionSteps) >= 1:
self.TRACKED_AGENT = decisionSteps.agent_id[0]
if trackedAgent == None:
trackedAgent = self.TRACKED_AGENT
if trackedAgent in decisionSteps: # ゲーム終了していない場合、環境状態がdecision_stepsに保存される
nextState = decisionSteps[trackedAgent].obs[0]
nextState = np.reshape(nextState,[1,self.STATE_SIZE+self.LOAD_DIR_SIZE_IN_STATE])
2022-09-05 12:22:34 +00:00
saveNow = nextState[0][-1]
loadDir = nextState[0][-3:-1]
nextState = nextState[0][:-3]
2022-09-05 11:46:08 +00:00
reward = decisionSteps[trackedAgent].reward
done = False
if trackedAgent in terminalSteps: # ゲーム終了した場合、環境状態がterminal_stepsに保存される
nextState = terminalSteps[trackedAgent].obs[0]
nextState = np.reshape(nextState,[1,self.STATE_SIZE+self.LOAD_DIR_SIZE_IN_STATE])
2022-09-05 12:22:34 +00:00
saveNow = nextState[0][-1]
loadDir = nextState[0][-3:-1]
nextState = nextState[0][:-3]
2022-09-05 11:46:08 +00:00
reward = terminalSteps[trackedAgent].reward
done = True
2022-09-05 12:22:34 +00:00
return nextState, reward, done, loadDir, saveNow
2022-09-05 11:46:08 +00:00
def reset(self):
2022-09-05 12:22:34 +00:00
nextState,reward,done,loadDir,saveNow = self.getSteps()
return nextState,reward,done,loadDir,saveNow
2022-09-05 11:46:08 +00:00
def render(self):