Commit f4152bc1 by 20201219013

Q-Learning & other updates

parent d231fc7f
__pycache__
*.pyc
\ No newline at end of file
*.pyc
.idea
\ No newline at end of file
#!/usr/bin/env python3
# encoding utf-8
# from DiscreteHFO.HFOAttackingPlayer import HFOAttackingPlayer
# from DiscreteHFO.Agent import Agent
import argparse
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
from hfoEnv import hfoEnv
class QLearningAgent(object):
def __init__(self, learningRate, discountFactor, epsilon, initVals=0.0):
super(QLearningAgent, self).__init__()
self.learningRate = learningRate
self.discountFactor = discountFactor
self.epsilon = epsilon
self.currentState = None
self.actions = ['DRIBBLE_UP', 'DRIBBLE_DOWN', 'DRIBBLE_LEFT', 'DRIBBLE_RIGHT', 'SHOOT']
self.Qs = defaultdict(lambda: np.ones(len(self.actions)) * initVals)
self.random = np.random.RandomState(0)
def learn(self):
# print( self.Qs)
(state, action, reward, status, nextState) = self.cache
max_Q_next = 0.0 if status != 0 else np.max(self.Qs[nextState])
action = self.actions.index(action)
# print(action)
error = self.learningRate*(
reward + self.discountFactor*max_Q_next
- self.Qs[state][action]
)
self.Qs[state][action] = (
self.Qs[state][action]
+ error
)
return error
def act(self):
p = self.random.uniform()
Q = self.Qs[self.currentState]
if p >= self.epsilon:
return self.actions[np.argmax(Q)]
else:
return self.actions[self.random.choice(len(self.actions), size=1, replace=False)[0]]
def toStateRepresentation(self, state):
return str(state)
def setState(self, state):
self.currentState = state
def setExperience(self, state, action, reward, status, nextState):
self.cache = (state, action, reward, status, nextState)
def setLearningRate(self, learningRate):
self.learningRate = learningRate
def setEpsilon(self, epsilon):
self.epsilon = epsilon
def reset(self):
pass
def computeHyperparameters(self, numTakenActions, episodeNumber):
return self.learningRate, 0.0 if episodeNumber > 4000 else max(0,self.epsilon-((1.0)/4000))
# return ((1e-3 - 1.0)/4000)*episodeNumber + 1.0, ((0.0 - 1.0)/4000)*episodeNumber + 1.0
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--id', type=int, default=0)
parser.add_argument('--numOpponents', type=int, default=0)
parser.add_argument('--numTeammates', type=int, default=0)
parser.add_argument('--numEpisodes', type=int, default=500)
args=parser.parse_args()
# Initialize connection with the HFO server
# hfoEnv = HFOAttackingPlayer(numOpponents = args.numOpponents, numTeammates = args.numTeammates, agentId = args.id)
# hfoEnv.connectToServer()
hfoEnv = hfoEnv()
# Initialize a Q-Learning Agent
agent = QLearningAgent(learningRate = 0.1, discountFactor = 0.99, epsilon = 0.6)
numEpisodes = args.numEpisodes
log_freq = 50
rewards_list = []
rewards = 0.0
# Run training using Q-Learning
numTakenActions = 0
for episode in range(numEpisodes):
status = 0
observation = hfoEnv.reset()
while status==0:
learningRate, epsilon = agent.computeHyperparameters(numTakenActions, episode)
agent.setEpsilon(epsilon)
agent.setLearningRate(learningRate)
obsCopy = [observation].copy()[0]
agent.setState(agent.toStateRepresentation(obsCopy))
action = agent.act()
numTakenActions += 1
nextObservation, reward, done, status = hfoEnv.step(action,obsCopy)
agent.setExperience(agent.toStateRepresentation(obsCopy), action, reward, status, agent.toStateRepresentation(nextObservation))
update = agent.learn()
# if update >0:
# print("learn")
if status == 1 and nextObservation == "GOAL":
rewards += 1.0
elif status==1:
reward += 0
observation = nextObservation
if (episode + 1) % log_freq == 0:
print(episode + 1)
print("rewards %f" % (rewards/log_freq))
rewards_list.append(rewards/log_freq)
rewards = 0
plt.cla()
plt.plot(rewards_list)
plt.pause(0.01)
print(rewards_list)
plt.cla()
plt.plot(rewards_list)
plt.savefig("Q-Learning.png")
plt.show()
\ No newline at end of file
import numpy as np
class hfoEnv(object):
def __init__(self):
# Possible states are elements of [0,1,...,5] x [0,1,...,5]
# and two additional states to indicate GOALS and OUT (Wayward kicks)
self.S = [(x, y) for x in range(5) for y in range(5)]
self.S.append("GOAL")
self.S.append("OUT")
# Agent possible actions
self.A = ["DRIBBLE_UP", "DRIBBLE_DOWN", "DRIBBLE_LEFT", "DRIBBLE_RIGHT", "SHOOT"]
# Opposition locations
self.oppositions = [(2, 2), (4, 2)]
# Probability of scoring from locations in the pitch
# each list inside goalProbs represents probability of scoring goal
# for grids in a column, starting from the leftmost column
self.goalProbs = [[0.00, 0.00, 0.0, 0.00, 0.00], [0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.3, 0.5, 0.3, 0.0], [0.0, 0.8, 0.9, 0.8, 0.0]]
def reset(self): # reset just returns a new initial position
return self.S[np.random.randint(len(self.S))]
# tmp = self.S.copy()
# tmp_index = np.random.randint(len(tmp))
# return tmp[tmp_index]
def getRewards(self, initState, Action, nextState):
""" Return R(s,a,s') for the MDP
Keyword Arguments:
initState -- The current state s.
action -- The chosen action in state s, a.
nextState -- The next state s'
"""
if nextState == "GOAL": # Rewards if agents managed to score a goal
if initState != "GOAL":
return 1
else:
return 0
elif nextState == "OUT":
if initState != "OUT":
return -1
else:
return 0
elif nextState in self.oppositions: # Rewards if agent bumped into opposition placed in (2,2) and (4,2)
return -0.5
else:
return 0
def probNextStates(self, initState, action):
""" Return the next state probability for the MDP as a dictionary.
Keyword Arguments:
initState -- The current state s.
action -- The chosen action in state s, a.
"""
nextStateProbs = {}
if initState != "GOAL" and initState != "OUT":
if action != "SHOOT":
possibleDestinations = [(initState[0], max(0, initState[1] - 1)),
(initState[0], min(4, initState[1] + 1)),
(max(0, initState[0] - 1), initState[1]),
(min(4, initState[0] + 1), initState[1])]
intendedDestination = None
if action == "DRIBBLE_UP":
intendedDestination = (initState[0], max(0, initState[1] - 1))
elif action == "DRIBBLE_DOWN":
intendedDestination = (initState[0], min(4, initState[1] + 1))
elif action == "DRIBBLE_LEFT":
intendedDestination = (max(0, initState[0] - 1), initState[1])
else:
intendedDestination = (min(4, initState[0] + 1), initState[1])
nextStateProbs[intendedDestination] = 0.8
for intendedDestination in possibleDestinations: # 保证在上述state有重复时和为1
if not intendedDestination in nextStateProbs.keys():
nextStateProbs[intendedDestination] = 0.0
nextStateProbs[intendedDestination] += 0.05
else:
nextStateProbs["GOAL"] = self.goalProbs[initState[0]][initState[1]]
nextStateProbs["OUT"] = 1.0 - nextStateProbs["GOAL"]
elif initState == "GOAL":
nextStateProbs["GOAL"] = 1.0
else:
nextStateProbs["OUT"] = 1.0
return nextStateProbs
def step(self,action,initState):
nextStateProbs = self.probNextStates(initState,action)
nextObservation = list(nextStateProbs.keys())[list(nextStateProbs .values()).index(max(nextStateProbs .values()))]
reward = self.getRewards(initState,action,nextObservation)
if nextObservation == "GOAL" or nextObservation == "OUT":
status = 1
else:
status = 0
return nextObservation,reward,None,status
......@@ -2,8 +2,8 @@
姓名:武亚宁
描述:这是对于第三部分 — 强化学习 的大作业
持续更新,包含:
Dynamic Programming
Monte Carlo Model Control
Dynamic Programming (修正了作业答案中的小问题)
Monte Carlo Model Control (目前在learn from episode时使用O(n^2)的算法,之后会更新为O(n)的)
Q-Learning
Deep Reinforcement Learning
Explore and Exploit (Optional)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment