Commit 7066ba78 by 20200318029

homework7

parent 4af6d4e1
This source diff could not be displayed because it is too large. You can view the blob instead.
#!/usr/bin/env python
# coding: utf-8
# In[16]:
import numpy as np
import math
import random
# In[17]:
"""
Class UCBArm creates the LinUCB K-arms of our contextual-bandit.
Via the LinUCB1 algorithm, our multi-arm bandit
learns to optimize the cumulative take-rate (CTR) and minimize
regret log-linearly. Through this optimization our bandit learns
in an on-line manner by sequentially updating bandit arms
based on an observed reward given a new context vector at each
time step.
"""
class UCBArm(object):
"""
Initialization:
All bandits at time step 0 are initialized to
have a DxD design matrix 'A' that is the identity
matrix and 'b' vector init to 0s.
@param:
id - unique arm id (1...K) for each arm
d - length of context vector
alpha - exploitation rate
"""
def __init__(self, id, d, alpha):
self.id = id
self.d = d
self.alpha = alpha
# Li lines 5-6
self.A = np.identity(self.d)
self.b = np.zeros((self.d,1))
"""
getUCB:
Calculates the ucb given a context vector.
Assumes expected payoff is linear in its
d-dimensional feature vector. When considering
all the arms of the bandit this is performing
ridge regression to predict which arm should
be played given a context vector and using
on all the arms UCBs.
@param:
x - context vector (1 x d)
@return:
ucb - upper confidence bound
"""
def getUCB(self, x):
Ainv = np.linalg.inv(self.A)
x = x.values[:, None]
self.thetaHat = np.matmul(Ainv, self.b)
self.stdev = np.sqrt(np.matmul(np.matmul(x.T, Ainv), x))
self.ucb = np.matmul(self.thetaHat.T, x) + self.alpha * self.stdev
return self.ucb[0][0]
"""
update_arm:
Updates an arm's 'A' matrix and 'b' vector
based on observed reward and context vector
@param:
reward - reward for predicted action
x - context vector (1 x d)
"""
def update_arm(self, reward, x):
x = x.values[:, None]
self.A += np.matmul(x, x.T)
self.b += reward * x
return None
"""
update_alpha:
Used to update alpha during the training process
@param:
method - alpha update rule
t - None (default); use the time step to update alpha (ex alpha/t)
"""
def update_alpha(self, method=1, t=None):
if method == 1:
self.alpha = 0.1 / np.sqrt(t + 1)
elif method == 2:
self.alpha = 0.1
elif method == 3:
self.alpha = 10 / (np.sqrt(t + 1) + 2 * t)
return None
# In[18]:
"""
Class LinUCB implements Li's LinUCB Algorithm [1]
for linear disjoint models for K-arm contextual
bandits.
"""
class LinUCB(object):
"""
Initialization:
Creates a bandit and init's it's K arms.
@param:
alpha - expliotation rate
d - length of context vector
n - number of arms
arms: dictionary of UCBArms. Basically contains Da and ca
(consequentially Aa also) from the original paper
"""
def __init__(self, alpha, d, k):
self.alpha = alpha
self.d = d #100
self.nArms = k #10
self.arms = self.init_arms()
"""
init_arms:
Init nArms of UCBarms
@return:
arms_dict - dictionary of arms of class UCBArm
"""
def init_arms(self):
arms_dict = {}
for id in range(1, self.nArms + 1):
arms_dict[id] = UCBArm(id, self.d, self.alpha)
return arms_dict
"""
get_ucbs:
Calculates ucb for all arms
@param:
x - context vector
@return:
ucbs - dictionary of mappings of v: ucb, k: arm id
"""
def get_ucbs(self, x):
ucbs = {}
for arm in self.arms:
ucbs[arm] = self.arms[arm].getUCB(x)
return ucbs
"""
choose_arm:
Returns id of arm with maximum ucb. Breaks ties
uniformly at random
@param:
ucbs - dictionary of ucbs for all arms
@return:
arm_id - id of arm with max ucb
"""
def choose_arm(self, ucbs):
max_ucb = -1
max_ucb_ids = set()
### todo
for _, ucb in ucbs.items():
if max_ucb < ucb:
max_ucb = ucb
for arm, ucb in ucbs.items():
if ucb == max_ucb:
max_ucb_ids.add(arm)
if len(max_ucb_ids) > 1:
return random.sample(max_ucb_ids, 1)[0]
else:
return list(max_ucb_ids)[0]
"""
get_reward:
If predicted 'arm' equals true action
reward is 1, else 0
@param:
arm - predicted action/arm for context
@return:
action - true observed action for context
"""
def get_reward(self, arm, action):
if arm == action:
return 1
return 0
"""
predict:
Helper function that calls the above functions
to predict an action based on a given context vector
@param:
x - context vector
@return:
pred_act - predicted action (arm id)
"""
def predict(self, x):
ucbs = self.get_ucbs(x)
pred_act = self.choose_arm(ucbs)
return pred_act
# In[19]:
"""
Class LinUCB implements unbiased offline evaluation
of our multi-arm contextual bandit following
Li, Chu, et. al. [2]. At each time step (2...T)
we use our algorithm from t-1 to predict t context
vector. We evaluate our bandit's cumulative
take-rate over time.
"""
class bandit_evaluator(object):
"""
Initialization:
Creates an evaluator object to store bandit history
and calculate CTR
bandits: list to store our trained bandit history
cum_rewards: cumulative rewards earned
ctr_history: CTR history
"""
def __init__(self):
self.bandits = []
self.cum_rewards = 0
self.ctr_history = []
"""
calc_ctr:
Makes prediction for new observed context at time t
using the t-1 bandit and gets rewards then calculates
CTR
@param:
x - context vector at time t
action - true action for x
t - current time step
@return:
ctr - cumulative take-rate
"""
def calc_ctr(self, x, action, t):
assert t > 0
bandit = self.bandits[action]
pred_act = bandit.predict[x]
### todo
ctr =
self.ctr_history.append(ctr)
return ctr
# In[20]:
from utils import getData, getContext, getAction
"""
train:
Main driver function that implements LinUCB1
and trains our multi-arm contextual bandit
@param:
file - data file to use (see readme for example)
steps - number of time steps (i.e. total observations in data)
nArms - number of bandit arms (K in paper)
d - dimension of context vector
@return:
ctr_history - cumulative take-rate history
"""
def train(file, steps, alpha, nArms, d):
# read in data
data = getData(file)
# initialize K-arm bandit
bandit = LinUCB(alpha, d, nArms)
# initialize bandit evaluator
evaluator = bandit_evaluator()
for t in range(steps):
x = getContext(data, t)
action = getAction(data, t)
arm = bandit.predict(x)
reward = bandit.get_reward(arm, action)
bandit.arms[arm].update_arm(reward, x)
if t > 0: # explore various alpha update methods to improve CTR
bandit.arms[arm].update_alpha(method=2) # or method=2
#bandit.arms[arm].update_alpha(3, t)
if t > 0: # evaluate current bandit algorithm
evaluator.bandits.append(bandit)
ctr = evaluator.calc_ctr(x, action, t)
if t % 100 == 0:
print("Step:", t, end="")
print(" | CTR: {0:.02f}%".format(ctr))
return evaluator.ctr_history
# In[21]:
file = "classification.txt"
steps = 10000
alpha = .1
nArms = 10
dim = 100
ctr_history = train(file, steps, alpha, nArms, dim)
# In[23]:
# dianogstics
print(ctr_history)
# In[1]:
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment