import mlagents_envs from mlagents_envs.base_env import ActionTuple from mlagents_envs.environment import UnityEnvironment import argparse import tensorflow as tf import tensorflow_addons as tfa import tensorboard import numpy as np import matplotlib.pyplot as plt import time import datetime from collections import deque from IPython.display import clear_output print("ML-Agents Version :",mlagents_envs.__version__) print("TensroFlow Version:",tf.__version__) # 環境パラメータ log_dir = "ML-logs/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") tb_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir) env_path = './ScrollBall-Build/ML-ScrollBall-Sample' brain_name = 'RollerBallBrain' parser = argparse.ArgumentParser() parser.add_argument("--env_path", default=env_path) parser.add_argument("-train", action="store_true",default=False) parser.add_argument("-silent", action="store_true",default=False) args = parser.parse_args() # ゲーム環境獲得 env = UnityEnvironment(file_name=args.env_path, seed=1, side_channels=[]) env.reset() # 環境スペック獲得 tracked_agent = -1 behavior_specs = env.behavior_specs behavior_name = list(behavior_specs)[0] spec = behavior_specs[behavior_name] observation_specs = spec.observation_specs[0] # 観測spec action_spec = spec.action_spec # 動作spec ENV_Discrete_ACTION_SIZE = action_spec.discrete_size# 連続的な動作のSize ENV_Continuous_ACTION_SIZE = action_spec.continuous_size# 離散的な動作のSize STATE_SIZE = observation_specs.shape[0]# 環境観測データ数 SAVE_STEPS = 100 # SAVE_STEPS毎にNNを保存する ACTION_SIZE = ENV_Discrete_ACTION_SIZE * 3#トータル動作数、一種類の動作に三つの動作が存在するため、*3とする MAX_EXP_NUM = 2500 # ExperiencePoolに保存できる最大過去記録数 EPSILON_CUT_STEP = 1300 EPISODES = 500 REPLACE_STEPS = 50 BATCH_SIZE = 256 LEARNING_RATE = 0.0005 GAMMA = 0.9 epsilon = 1 epsilon_min = 0.01 print("ステップ毎に環境観測データ数",STATE_SIZE) print("ステップ毎に実行可能な動作数",ENV_Discrete_ACTION_SIZE) # Experience Pool class experiencePool: def __init__(self): self.exp_pool = deque(maxlen=MAX_EXP_NUM) def add(self, state, action, reward, netx_state, done): self.exp_pool.append((state, action, reward, netx_state, done)) def get_random(self, num=1): random_index = np.random.choice(len(self.exp_pool), num) random_exps = [self.exp_pool[i] for i in random_index] return random_exps def get_len(self): return len(self.exp_pool) # DQNメソッド class DQN: def __init__(self,load,load_dir): self.learning_rate = LEARNING_RATE self.epsilon = 1 self.epsilon_min = 0.01 self.epsilon_cut = (1-self.epsilon_min)/EPSILON_CUT_STEP self.gamma = GAMMA if load: #既存NNデータをローディングする self.epsilon = self.epsilon_min main_load_dir = load_dir+"main.h5" target_load_dir = load_dir+"target.h5" self.main_net,self.target_net = self.loadNN(main_load_dir,target_load_dir) else: #新規mainとtarget NNを作成する self.main_net = self.build_net() self.target_net = self.build_net() self.exp_pool = experiencePool() # --------------------------------------------------------------------------------- def build_net(self): # NNを作成 rectifiedAdam = tfa.optimizers.RectifiedAdam(learning_rate = self.learning_rate,weight_decay = 0.001) #Adam = tf.keras.optimizers.Adam(learning_rate=self.learning_rate) neural_net = tf.keras.Sequential() neural_net.add(tf.keras.layers.Dense( units=128, activation='relu', input_dim=STATE_SIZE)) neural_net.add(tf.keras.layers.Dense( units=256, activation='relu')) neural_net.add(tf.keras.layers.Dense( units=128, activation='relu')) neural_net.add(tf.keras.layers.Dense( units=64, activation='relu')) neural_net.add(tf.keras.layers.Dense( units=ACTION_SIZE, activation='elu')) neural_net.compile(optimizer=rectifiedAdam, loss='mse', metrics=['accuracy']) return neural_net def select_action(self, state): # 動作Q値を予測と動作選択 random_num = np.random.sample() if random_num > self.epsilon: # DQNをベースにし、動作を選択する predictResult = self.main_net(state).numpy()[0] actionX = np.argmax(predictResult[0:3])-1 actionZ = np.argmax(predictResult[3:])-1 action = np.array([actionX, actionZ], dtype=np.float32) #print("action = ",action) else: # ランダムで動作を選択 actionX = np.random.randint(ACTION_SIZE/2)-1 actionY = np.random.randint(ACTION_SIZE/2)-1 action = np.array([actionX, actionY], dtype=np.float32) # 缩小epsilon if self.epsilon > self.epsilon_min: self.epsilon -= self.epsilon_cut return action def training(self): # トレーニング開始 if self.exp_pool.get_len() >= BATCH_SIZE: # トレーニング集を獲得 exp_set = self.exp_pool.get_random(num=BATCH_SIZE) exp_state = [data[0] for data in exp_set] # EXP_Poolが記録した当時ラウンドの環境状態 exp_action = [data[1] for data in exp_set] # そのラウンドで選んだ動作 exp_reward = [data[2] for data in exp_set] # その動作に応じるreward exp_next_state = [data[3] for data in exp_set] # その動作が実行した後の環境状態。 exp_done = [data[4] for data in exp_set] # 実行後にゲームが終了したか exp_state = np.asarray(exp_state).squeeze() exp_action = np.asarray(exp_action).squeeze() exp_next_state = np.asarray(exp_next_state).squeeze() # 各ネットでQ値予測 target_net_q = self.target_net(exp_next_state).numpy() # target_NN 未来状況のQ値を予測 main_net_q = self.main_net(exp_state).numpy() # main_NN 現在状況のQ値を予測 # トレーニング用Q値、目標y、 y = main_net_q.copy() # (1,6) # Batch全体インデクス、[0,1,......,BATCH_SIZE] batch_index = np.arange(BATCH_SIZE, dtype=np.int32) # 動作の値(-1,0,1)によってQ値のインデクス(Xは(0,1,2)、Zは(3,4,5),各自Shapeは(1,BATCH_SIZE))を作成 exp_actionX_index = exp_action[:,0] + 1 exp_actionZ_index = exp_action[:,1] + 4 exp_actionX_index = exp_actionX_index.astype(np.int) exp_actionZ_index = exp_actionZ_index.astype(np.int) # target_NNが未来状況によって予測したQ値から 垂直/水平動作各自の最大値Q値を摘出 fixedX = np.max(target_net_q[:, :3], axis=1) # (batchsize,1) fixedZ = np.max(target_net_q[:, -3:], axis=1) # (batchsize,1) # そのラウンドで受けたreward+未来最大Q値の和で修正値となる fixedX = exp_reward + self.gamma*fixedX fixedZ = exp_reward + self.gamma*fixedZ # ゲーム終了のラウンドでの修正値は元のreward,ゲーム続行する時の修正値はfixedXとfixedYとする y_fixedX = np.where(exp_done,exp_reward,fixedX) y_fixedZ = np.where(exp_done,exp_reward,fixedZ) # 修正値を応用 y[batch_index, exp_actionX_index] = y_fixedX y[batch_index, exp_actionZ_index] = y_fixedZ # main_netに入れて、フィットする self.main_net.fit(exp_state, y, epochs=5, verbose=0,callbacks = [tb_callback]) def saveNN(self): # 両NNを保存する main_save_dir= "ML-Model/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")+"main.h5" target_save_dir= "ML-Model/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")+"target.h5" self.main_net.save(main_save_dir) self.target_net.save(target_save_dir) print("Model Saved") def loadNN(self,main_load_dir,target_load_dir): # 両NNをローディングする main_net_loaded = tf.keras.models.load_model(main_load_dir) target_net_loaded = tf.keras.models.load_model(target_load_dir) print("Model Loaded") return main_net_loaded,target_net_loaded if not args.train: agent = DQN(load = True,load_dir="ML-Model/" + "FinalNN-") else: agent = DQN(load = False,load_dir="ML-Model/" + "20220205-051103") total_steps = 0 steps_list = [] successTimes = 0 failTimes = 0 successTimes_his = [] failTimes_his = [] this10TimesWin = 0 MeanWinPerin10Times = [] for episode in range(EPISODES): # episode 開始 done = False #ゲーム終了状態をFalse steps = 0 # 環境初期状態を獲得 env.reset() decision_steps, terminal_steps = env.get_steps(behavior_name) state = decision_steps.obs[0] state = np.reshape(state, [1, STATE_SIZE]) # ゲームスタート while True: reward = 0 steps+=1 total_steps += 1 # エージェントナンバーをトラックする if tracked_agent == -1 and len(decision_steps) >= 1: tracked_agent = decision_steps.agent_id[0] # REPLACE_STEPS毎でtarget_net にmain_netで入れ替える if total_steps % REPLACE_STEPS == 0 and total_steps !=0: agent.target_net.set_weights(agent.main_net.get_weights()) print('target_net replaced') # SAVE_STEPS毎でNNを保存する if total_steps % SAVE_STEPS ==0 and total_steps !=0: agent.saveNN() # main_netで動作選択 action = agent.select_action(state=state) continuous_actions = np.array([[0]], dtype=np.float) discrete_actions = np.expand_dims(action,axis=0) # 動作をML-Agentsが認識可能なActionTuple型に変換 action_Tuple = ActionTuple( continuous=continuous_actions, discrete=discrete_actions) # 動作をゲーム環境に渡す。 env.set_actions(behavior_name=behavior_name, action=action_Tuple) env.step() # 環境が動作を実行し、次の環境状態を返す。 decision_steps, terminal_steps = env.get_steps(behavior_name) if tracked_agent in decision_steps: # ゲーム終了していない場合、環境状態がdecision_stepsに保存される next_state = decision_steps[tracked_agent].obs[0] next_state = np.reshape(next_state,[1,STATE_SIZE]) reward = decision_steps[tracked_agent].reward if tracked_agent in terminal_steps: # ゲーム終了した場合、環境状態がterminal_stepsに保存される next_state = terminal_steps[tracked_agent].obs[0] next_state = np.reshape(next_state,[1,STATE_SIZE]) reward = terminal_steps[tracked_agent].reward done = True # Experience_poolに保存 agent.exp_pool.add(state,action,reward,next_state,done) #print("Reward = ",reward) # 環境状態を次状態に変更。 state = next_state # ゲーム終了後処理 if done: mean_steps = total_steps/(episode+1) print("\nEpisode",episode,"done,Reward =",reward,"mean steps =",mean_steps,"exp_num =",agent.exp_pool.get_len(),"\nepsilon =",agent.epsilon) agent.training() if(reward >=10): successTimes+=1 this10TimesWin+=1 else: failTimes+=1 successTimes_his.append(successTimes) failTimes_his.append(failTimes) if episode % 10 ==0 and episode !=0: clear_output() this10TimesWinPer = float(this10TimesWin/10) this10TimesWin = 0 MeanWinPerin10Times.append(this10TimesWinPer) # 合計成功数(緑)、合計失敗数(赤)を図で表示する if not args.silent: plt.figure(figsize = (15,10)) plt.plot(range(len(successTimes_his)), successTimes_his,color='green',linestyle='--', linewidth=1, label='TotalWinTimes') plt.plot(range(len(successTimes_his)), failTimes_his,color='red', linewidth=1,marker='o', markersize=1, markerfacecolor='black',markeredgecolor='black',label='TotalFaildTimes') # plt.savefig('output.jpg') plt.legend() plt.savefig("wintimes.png") plt.show() #10回実行した後の成功確率を図で表示する plt.figure(figsize=(15,10)) plt.plot(MeanWinPerin10Times) plt.savefig("steps.png") plt.show() break env.close() print("Finished~")