Commit d231fc7f by 20201219013

add DP and MC coursework (todo: update MC, improve efficiency)

parent d1c2a713
# coding=utf-8
from MDP import MDP
import numpy as np
import matplotlib.pyplot as plt
def plot_value_and_policy(values, policy):
data = np.zeros((5, 5))
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.title('Value')
for y in range(data.shape[0]):
for x in range(data.shape[1]):
data[y][x] = values[(x, y)]
plt.text(x + 0.5, y + 0.5, '%.4f' % data[y, x], horizontalalignment='center', verticalalignment='center', )
heatmap = plt.pcolor(data)
plt.gca().invert_yaxis()
plt.colorbar(heatmap)
plt.subplot(1, 2, 2)
plt.title('Policy')
for y in range(5):
for x in range(5):
for action in policy[(x, y)]:
if action == 'DRIBBLE_UP':
plt.annotate('', (x + 0.5, y), (x + 0.5, y + 0.5), arrowprops={'width': 0.1})
if action == 'DRIBBLE_DOWN':
plt.annotate('', (x + 0.5, y + 1), (x + 0.5, y + 0.5), arrowprops={'width': 0.1})
if action == 'DRIBBLE_RIGHT':
plt.annotate('', (x + 1, y + 0.5), (x + 0.5, y + 0.5), arrowprops={'width': 0.1})
if action == 'DRIBBLE_LEFT':
plt.annotate('', (x, y + 0.5), (x + 0.5, y + 0.5), arrowprops={'width': 0.1})
if action == 'SHOOT':
plt.text(x + 0.5, y + 0.5, action, horizontalalignment='center', verticalalignment='center', )
heatmap = plt.pcolor(data)
plt.gca().invert_yaxis()
plt.colorbar(heatmap)
plt.show()
class BellmanDPSolver(object):
def __init__(self, discountRate=0.9):
self.MDP = MDP()
self.discountRate = discountRate
self.initVs()
def initVs(self):
self.V = {}
self.policy = {}
for state in self.MDP.S:
self.V[state] = 0
self.policy[state] = np.array([0.5] * len(self.MDP.A))
def BellmanUpdate(self):
# state一共27种,我们每一轮要更新的是 V[state]
# nextState = self.MDP.probNextStates((2, 2), self.MDP.A)
# print(nextState)
# next_V = [0.0] * len(self.V)
updatedStateValue = self.V.copy()
for state in self.MDP.S:
currValue = self.V[state] # state: position
tmp_V = [0.0] * len(self.MDP.A) # compute four next values
for idx, action in enumerate(self.MDP.A):
transitions = self.MDP.probNextStates(state, action)
for newState, prob in transitions.items(): #这里计算value的方式和下面action-value相同, 因为状态转移是通过action给出的, 并没有更直接的状态转移矩阵
reward = self.MDP.getRewards(state, action, newState)
tmp_V[idx] += prob * 1.0 * (reward + self.discountRate * self.V[newState])
updatedStateValue[state] = np.max(tmp_V)
self.V = updatedStateValue
policy = {}
for state in self.MDP.S:
action_value = np.zeros(len(self.MDP.A))
for idx, action in enumerate(self.MDP.A):
transitions = self.MDP.probNextStates(state, action)
for new_state, prob in transitions.items():
reward = self.MDP.getRewards(state, action, new_state)
action_value[idx] += prob * 1.0 * (reward + self.discountRate * self.V[new_state])
self.policy[state] = np.zeros((len(self.MDP.A))) # 初始化为0
max_actions = np.argwhere(action_value == np.amax(action_value)).flatten().tolist()
max_actions = np.sort(max_actions)
prob_action = 1. / len(max_actions) # 这里直接采用贪心平分policy, 并没有使用epsilon greedy
self.policy[state][max_actions] = prob_action # multi assignments, 均分概率
policy[state] = np.array(self.MDP.A)[max_actions].tolist() # convert index np array to list
return self.V, policy
if __name__ == '__main__':
solution = BellmanDPSolver()
for i in range(100):
values, policy = solution.BellmanUpdate()
print("Values : ", values)
print("Policy : ", policy)
plot_value_and_policy(values, policy)
# coding=utf-8
class MDP(object):
def __init__(self):
# Possible states are e;elements of [0,1,...,5] x [0,1,...,5]
# and two additional states to indicate GOALS and OUT (Wayward kicks)
self.S = [(x,y) for x in range(5) for y in range(5)]
self.S.append("GOAL")
self.S.append("OUT")
# Agent possible actions
self.A = ["DRIBBLE_UP","DRIBBLE_DOWN","DRIBBLE_LEFT","DRIBBLE_RIGHT","SHOOT"]
# Opposition locations
self.oppositions = [(2,2), (4,2)]
# Probability of scoring from locations in the pitch
# each list inside goalProbs represents probability of scoring goal
# for grids in a column, starting from the leftmost column
self.goalProbs = [[0.00,0.00,0.0,0.00,0.00],[0.0, 0.0,0.0,0.0,0.0],[0.0,0.0,0.0,0.0,0.0],[0.0,0.3,0.5,0.3,0.0],[0.0,0.8,0.9,0.8,0.0]]
def getRewards(self, initState, Action, nextState):
""" Return R(s,a,s') for the MDP
Keyword Arguments:
initState -- The current state s.
action -- The chosen action in state s, a.
nextState -- The next state s'
"""
if nextState == "GOAL": # Rewards if agents managed to score a goal
if initState != "GOAL":
return 1
else:
return 0
elif nextState == "OUT":
if initState != "OUT":
return -1
else:
return 0
elif nextState in self.oppositions: # Rewards if agent bumped into opposition placed in (2,2) and (4,2)
return -0.5
else:
return 0
def probNextStates(self, initState, action):
""" Return the next state probability for the MDP as a dictionary.
Keyword Arguments:
initState -- The current state s.
action -- The chosen action in state s, a.
"""
nextStateProbs = {}
if initState != "GOAL" and initState != "OUT":
if action != "SHOOT":
possibleDestinations = [(initState[0], max(0,initState[1]-1)),(initState[0], min(4,initState[1]+1)),
(max(0,initState[0]-1), initState[1]), (min(4,initState[0]+1), initState[1])]
intendedDestination = None
if action == "DRIBBLE_UP":
intendedDestination = (initState[0], max(0,initState[1]-1))
elif action == "DRIBBLE_DOWN" :
intendedDestination = (initState[0], min(4,initState[1]+1))
elif action == "DRIBBLE_LEFT" :
intendedDestination = (max(0,initState[0]-1), initState[1])
else:
intendedDestination = (min(4,initState[0]+1), initState[1])
nextStateProbs[intendedDestination] = 0.8
for intendedDestination in possibleDestinations :
if not intendedDestination in nextStateProbs.keys():
nextStateProbs[intendedDestination] = 0.0
nextStateProbs[intendedDestination] += 0.05 # this is a plus sign: [0.85, 0.05, 0.05, 0.05]
else:
nextStateProbs["GOAL"] = self.goalProbs[initState[0]][initState[1]]
nextStateProbs["OUT"] = 1.0-nextStateProbs["GOAL"]
elif initState=="GOAL":
nextStateProbs["GOAL"] = 1.0
else:
nextStateProbs["OUT"] = 1.0
return nextStateProbs
#!/usr/bin/env python3
# encoding utf-8
#coding=utf-8
# from DiscreteHFO.HFOAttackingPlayer import HFOAttackingPlayer
# from DiscreteHFO.Agent import Agent
import argparse
from collections import defaultdict
import numpy as np
from hfoEnv import hfoEnv
class MonteCarloAgent(object):
def __init__(self, discountFactor, epsilon, initVals=0.0):
super(MonteCarloAgent, self).__init__()
self.discountFactor = discountFactor
self.epsilon = epsilon
self.currentState = None
self.actions = [
"DRIBBLE_UP",
"DRIBBLE_DOWN",
"DRIBBLE_LEFT",
"DRIBBLE_RIGHT",
"SHOOT",
]
self.Qs = defaultdict(lambda: np.ones(len(self.actions)) * initVals)
self.random = np.random.RandomState(0)
self.policy = defaultdict(lambda: np.ones((len(self.actions))) / len(self.actions))
self.state_action_count = defaultdict(lambda: np.zeros((len(self.actions))))
def learn(self):
state_action_count = defaultdict(lambda: np.zeros((len(self.actions)))) # default value: [0, 0, 0, 0, 0]
states = self.cache["states"]
actions = self.cache["actions"]
rewards = self.cache["rewards"]
statuses = self.cache["statuses"]
nextStates = self.cache["nextStates"]
results = []
for idx, (state, action, _, _, _) in enumerate (
zip(states, actions, rewards, statuses, nextStates)
):
action_idx = self.actions.index(action)
if state_action_count[state][action_idx] != 0: # use dict with in key word is better?
continue
state_action_count[state][action_idx] += 1
self.state_action_count[state][action_idx] += 1
rewards_after_state_action = rewards[idx:] # rewards after state x is rewards of all succeeding states.
discounted_rewards = sum ( # this solution is n^2, we can improve it
[self.discountFactor ** idx * reward for idx, reward in enumerate(rewards_after_state_action)]
)
old_Q = self.Qs[state][action_idx]
self.Qs[state][action_idx] = old_Q + (discounted_rewards - old_Q) * ( # update mu incrementally
1 / self.state_action_count[state][action_idx]
)
results.append(self.Qs[state][action_idx])
for state, Q in self.Qs.items():
action = np.argmax(Q)
for a in range(len(self.actions)):
if a == action:
self.policy[state][a] = (
1 - self.epsilon + self.epsilon / len(self.actions)
)
else:
self.policy[state][a] = self.epsilon / len(self.actions)
return _, results
def toStateRepresentation(self, state):
return str(state)
def setExperience(self, state, action, reward, status, nextState):
self.cache["states"].append(state)
self.cache["actions"].append(action)
self.cache["rewards"].append(reward)
self.cache["statuses"].append(status)
self.cache["nextStates"].append(nextState)
def setState(self, state):
self.currentState = state
def reset(self):
self.cache = {
"states": [],
"actions": [],
"rewards": [],
"statuses": [],
"nextStates": []
}
def act(self): # play the next move using current policy
idx = np.random.choice(len(self.actions), size=1, p=self.policy[self.currentState])[0]
return self.actions[idx]
def setEpsilon(self, epsilon):
self.epsilon = epsilon
def computeHyperparameters(self, numTakenActions, episodeNumber): # explore before iteration 4000. exploit after 4
"""
compute real epsilon.
After 4000 iteration, the probability of choice become determined.
Otherwise, returns epsilon which slightly decreases each turn.
—— explore before iteration 4000. exploit after 4000
"""
return 0.0 if episodeNumber > 4000 else max(0, self.epsilon - ((1.0) / 4000))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--id", type=int, default=0)
parser.add_argument("--numOpponents", type=int, default=0)
parser.add_argument("--numTeammates", type=int, default=0)
parser.add_argument("--numEpisodes", type=int, default=500)
args = parser.parse_args()
# Init Connections to HFO Server
# hfoEnv = HFOAttackingPlayer(
# numOpponents=args.numOpponents, numTeammates=args.numTeammates, agentId=args.id
# )
# hfoEnv.connectToServer()
hfoEnv = hfoEnv()
log_freq = 50
rewards_list = []
rewards = 0
# init MonteCarloAgent
agent = MonteCarloAgent(discountFactor=0.99, epsilon=0.6)
numEpisodes = args.numEpisodes
numTakenActions = 0
print(numEpisodes)
for episode in range(numEpisodes):
agent.reset()
observation = hfoEnv.reset()
status = 0
# forward, one episode
while status == 0: # loop and record one episode, status = 1 GOAL or OUT
epsilon = agent.computeHyperparameters(numTakenActions, episode)
agent.setEpsilon(epsilon)
agent.setState(agent.toStateRepresentation(observation))
action = agent.act()
numTakenActions += 1
nextObservation, reward, done, status = hfoEnv.step(action, observation)
agent.setExperience(
agent.toStateRepresentation(observation),
action,
reward,
status,
agent.toStateRepresentation(nextObservation)
)
observation = nextObservation
rewards += reward
# learn, backward
agent.learn() # learn from one episode
if (episode + 1) % log_freq == 0:
print (episode + 1)
print (rewards)
print ("rewards %f" % (rewards/log_freq))
rewards_list.append(rewards/log_freq)
rewards = 0
# draw
import matplotlib.pyplot as plt
print(len(list(range(0,numEpisodes,log_freq))))
print(len(rewards_list))
plt.plot(list(range(0,numEpisodes,log_freq)), rewards_list)
plt.savefig("MonteCarlo.png")
plt.show()
\ No newline at end of file
import numpy as np
class hfoEnv(object):
def __init__(self):
# Possible states are elements of [0,1,...,5] x [0,1,...,5]
# and two additional states to indicate GOALS and OUT (Wayward kicks)
self.S = [(x, y) for x in range(5) for y in range(5)]
self.S.append("GOAL")
self.S.append("OUT")
# Agent possible actions
self.A = ["DRIBBLE_UP", "DRIBBLE_DOWN", "DRIBBLE_LEFT", "DRIBBLE_RIGHT", "SHOOT"]
# Opposition locations
self.oppositions = [(2, 2), (4, 2)]
# Probability of scoring from locations in the pitch
# each list inside goalProbs represents probability of scoring goal
# for grids in a column, starting from the leftmost column
self.goalProbs = [[0.00, 0.00, 0.0, 0.00, 0.00], [0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.3, 0.5, 0.3, 0.0], [0.0, 0.8, 0.9, 0.8, 0.0]]
def reset(self): # reset just returns a new initial position
return self.S[np.random.randint(len(self.S))]
# tmp = self.S.copy()
# tmp_index = np.random.randint(len(tmp))
# return tmp[tmp_index]
def getRewards(self, initState, Action, nextState):
""" Return R(s,a,s') for the MDP
Keyword Arguments:
initState -- The current state s.
action -- The chosen action in state s, a.
nextState -- The next state s'
"""
if nextState == "GOAL": # Rewards if agents managed to score a goal
if initState != "GOAL":
return 1
else:
return 0
elif nextState == "OUT":
if initState != "OUT":
return -1
else:
return 0
elif nextState in self.oppositions: # Rewards if agent bumped into opposition placed in (2,2) and (4,2)
return -0.5
else:
return 0
def probNextStates(self, initState, action):
""" Return the next state probability for the MDP as a dictionary.
Keyword Arguments:
initState -- The current state s.
action -- The chosen action in state s, a.
"""
nextStateProbs = {}
if initState != "GOAL" and initState != "OUT":
if action != "SHOOT":
possibleDestinations = [(initState[0], max(0, initState[1] - 1)),
(initState[0], min(4, initState[1] + 1)),
(max(0, initState[0] - 1), initState[1]),
(min(4, initState[0] + 1), initState[1])]
intendedDestination = None
if action == "DRIBBLE_UP":
intendedDestination = (initState[0], max(0, initState[1] - 1))
elif action == "DRIBBLE_DOWN":
intendedDestination = (initState[0], min(4, initState[1] + 1))
elif action == "DRIBBLE_LEFT":
intendedDestination = (max(0, initState[0] - 1), initState[1])
else:
intendedDestination = (min(4, initState[0] + 1), initState[1])
nextStateProbs[intendedDestination] = 0.8
for intendedDestination in possibleDestinations: # 保证在上述state有重复时和为1
if not intendedDestination in nextStateProbs.keys():
nextStateProbs[intendedDestination] = 0.0
nextStateProbs[intendedDestination] += 0.05
else:
nextStateProbs["GOAL"] = self.goalProbs[initState[0]][initState[1]]
nextStateProbs["OUT"] = 1.0 - nextStateProbs["GOAL"]
elif initState == "GOAL":
nextStateProbs["GOAL"] = 1.0
else:
nextStateProbs["OUT"] = 1.0
return nextStateProbs
def step(self,action,initState):
nextStateProbs = self.probNextStates(initState,action)
nextObservation = list(nextStateProbs.keys())[list(nextStateProbs .values()).index(max(nextStateProbs .values()))]
reward = self.getRewards(initState,action,nextObservation)
if nextObservation == "GOAL" or nextObservation == "OUT":
status = 1
else:
status = 0
return nextObservation,reward,None,status
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment