Tensorflow Federated
By Junyangz AT IIE, CAS.
Background
Much of the data is born decentralized(phones & IoT devices)
easy learn if data is in one place
Centralization has disadvantages
user experience
latency
offline
resource limits
data caps
battery life
privacy concerns
sensitive data
Learn without collect data on-device
often too little data per client
other clients aren't contributing
pre-training may help... sometimes
Learn together --- Federated Learning
Overview 总览
TensorFlow Federated (TFF) is an open-source framework for machine learning and other computations on decentralized data.
TensorFlow Federated (TFF) is an open-source framework for machine learning and other computations on decentralized data. TFF has been developed to facilitate open research and experimentation with Federated Learning (FL), an approach to machine learning where a shared global model is trained across many participating clients that keep their training data locally. For example, FL has been used to train prediction models for mobile keyboards without uploading sensitive typing data to servers.
TensorFlow 联合(TFF)是一个开源框架,用于在分布式数据上进行机器学习和其他计算。开发TFF是为了促进联合学习的开放式研究和实验,联合学习是一种机器学习方法,在这种方法中,在许多参与的客户之间训练共享的全局模型,这些客户将他们的训练数据保存在本地。例如,FL已被用于训练移动键盘的预测模型,而无需将敏感的打字数据上传到服务器。
TFF enables developers to simulate the included federated learning algorithms on their models and data, as well as to experiment with novel algorithms. The building blocks provided by TFF can also be used to implement non-learning computations, such as aggregated analytics over decentralized data. TFF’s interfaces are organized in two layers:
TFF使开发人员能够在他们的模型和数据上模拟联合学习算法,并尝试新的算法。TFF提供的构建模块也可用于实现非学习计算,例如对分散数据的聚合分析。TFF的接口分为两层:
Federated Learning (FL) API This layer offers a set of high-level interfaces that allow developers to apply the included implementations of federated training and evaluation to their existing TensorFlow models.
该层提供了一组高级接口,允许开发人员将所包含的联合训练和评估实现应用于现有的TensorFlow模型
Federated Core (FC) API At the core of the system is a set of lower-level interfaces for concisely expressing novel federated algorithms by combining TensorFlow with distributed communication operators within a strongly-typed functional programming environment. This layer also serves as the foundation upon which we've built Federated Learning.
该系统的核心是一组低级接口,用于通过在强类型函数编程环境中结合TensorFlow 和分布式通信操作符来简洁地表达新的联合算法。这一层也是我们建立联合学习的基础。
Federated Learning (FL) API layer
tff.learning
一组更高级别的接口,可用于执行常见类型的联合学习任务,例如联合训练,针对已在TensorFlow中实现的用户提供的模型。
准备输入数据
TFF仓库里自带了一些数据集,包括CMU制作的针对联合学习benchmark的数据集Leaf,由于每个作者都有一个独特的风格,这个数据集展示了非i.i.d的类型。这也是联合数据集的预期情况。
#@test {"output": "ignore"}
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
emnist_train.output_types, emnist_train.output_shapes
#(OrderedDict([(u'label', tf.int32), (u'pixels', tf.float32)]), OrderedDict([(u'label', TensorShape([])), (u'pixels', TensorShape([28, 28]))]))
example_dataset = emnist_train.create_tf_dataset_for_client(
emnist_train.client_ids[0])
example_element = iter(example_dataset).next()
example_element['label'].numpy()
将MNIST数据集中的28x28图像压缩成784个元素数组
数据已经是
tf.data.Dataset
,因此可以使用数据集转换完成预处理。将各个示例混合,将它们组织成批,然后将像素和标签的特征重命名为x和y,以便与Keras一起使用。 我们还重复数据集以运行多个周期。
NUM_EPOCHS = 10
BATCH_SIZE = 20
SHUFFLE_BUFFER = 500
def preprocess(dataset):
def element_fn(element):
return collections.OrderedDict([
('x', tf.reshape(element['pixels'], [-1])),
('y', tf.reshape(element['label'], [1])),
])
return dataset.repeat(NUM_EPOCHS).map(element_fn).shuffle(
SHUFFLE_BUFFER).batch(BATCH_SIZE)
def make_federated_data(client_data, client_ids):
return [preprocess(client_data.create_tf_dataset_for_client(x))
for x in client_ids]
#@test {"output": "ignore"}
NUM_CLIENTS = 3
sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]
federated_train_data = make_federated_data(emnist_train, sample_clients)
使用Keras创建模型
优化器和学习率不同于在i.i.d.数据集上的选择
def create_compiled_keras_model():
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(
10, activation=tf.nn.softmax, kernel_initializer='zeros', input_shape=(784,))])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.02),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
return model
将任何模型应用于TFF,需要使用
tff.learning.Model
重新包装下
def model_fn():
keras_model = create_compiled_keras_model()
return tff.learning.from_compiled_keras_model(keras_model, sample_batch)
在联合数据集上训练模型
联合平均算法
#@test {"output": "ignore"}
iterative_process = tff.learning.build_federated_averaging_process(model_fn)
TFF构建了一对联合计算并将它们打包成tff.utils.IterativeProcess
,其中这些计算可作为一对初始化和下一个属性使用。
state = iterative_process.initialize()
调用initialize
计算来构造服务器状态
联合计算中的第二个函数next
表示单轮联合平均(Federated Averaging),其包括将服务器状态(包括模型参数)推送到客户端,对其本地数据进行设备上训练,收集和平均模型更新,并在服务器上生成新的更新模型。可以类比成以下过程
SERVER_STATE, FEDERATED_DATA -> SERVER_STATE, TRAINING_METRICS
next()
不仅是一个在server上运行的函数,而是作为整个分布式计算的声明性函数表示,某些输入由server(SERVER_STATE)提供,但每个参与的设备都提供其自己的本地数据集。
#@test {"timeout": 600, "output": "ignore"}
state, metrics = iterative_process.next(state, federated_train_data)
print('round 1, metrics={}'.format(metrics))
单轮训练
#@test {"skip": true}
for round_num in range(2, 11):
state, metrics = iterative_process.next(state, federated_train_data)
print('round {:2d}, metrics={}'.format(round_num, metrics))
多轮训练
自定义训练参数
定义一个数据结构
namedtuple
用于保存训练指标
MnistVariables = collections.namedtuple(
'MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')
创建mnist数据结构变量的函数
def create_mnist_variables():
return MnistVariables(
weights = tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
name='weights',
trainable=True),
bias = tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(10)),
name='bias',
trainable=True),
num_examples = tf.Variable(0.0, name='num_examples', trainable=False),
loss_sum = tf.Variable(0.0, name='loss_sum', trainable=False),
accuracy_sum = tf.Variable(0.0, name='accuracy_sum', trainable=False))
通过模型参数和累积统计信息的变量,定义计算损失,执行预测和更新单批输入数据的累积统计数据的正向传递方法
def mnist_forward_pass(variables, batch):
y = tf.nn.softmax(tf.matmul(batch['x'], variables.weights) + variables.bias)
predictions = tf.cast(tf.argmax(y, 1), tf.int32)
flat_labels = tf.reshape(batch['y'], [-1])
loss = -tf.reduce_mean(tf.reduce_sum(
tf.one_hot(flat_labels, 10) * tf.log(y), reduction_indices=[1]))
accuracy = tf.reduce_mean(
tf.cast(tf.equal(predictions, flat_labels), tf.float32))
num_examples = tf.cast(tf.size(batch['y']), tf.float32)
tf.assign_add(variables.num_examples, num_examples)
tf.assign_add(variables.loss_sum, loss * num_examples)
tf.assign_add(variables.accuracy_sum, accuracy * num_examples)
return loss, predictions
返回本地指标方法,需要正确加权来自不同用户的贡献
def get_local_mnist_metrics(variables):
return collections.OrderedDict([
('num_examples', variables.num_examples),
('loss', variables.loss_sum / variables.num_examples),
('accuracy', variables.accuracy_sum / variables.num_examples)
])
aggregate_mnist_metrics_across_clients
聚合本地指标数据的方法是一个TFF联合学习的计算表达式
输入
metrics
是上一个方法get_local_mnist_metrics
返回的,但值已不再是tf.Tensors
类型,而是经过Boxed
成为了tff.Value
,改变量不能再使用TensorFlow来修改,只能用TFF的联合操作例如:tff.federated_mean
和tff.federated_sum
函数返回的字典结果定义了服务端可用的参数集合
@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
return {
'num_examples': tff.federated_sum(metrics.num_examples),
'loss': tff.federated_mean(metrics.loss, metrics.num_examples),
'accuracy': tff.federated_mean(metrics.accuracy, metrics.num_examples)
}
模型使用的所有状态必须为TensorFlow变量,因为TFF在运行时不使用Python
模型应描述它接受的数据形式(input_spec),因为通常,TFF是一种强类型环境,并且希望确定所有组件的类型签名。 声明模型输入的格式是其中不可或缺的一部分
虽然技术上不需要,但建议将所有TensorFlow逻辑(正向传递,度量计算等)包装为
tf.functions
,因为这有助于确保TensorFlow可以序列化,并且不需要显式的控制依赖
class MnistModel(tff.learning.Model):
def __init__(self):
self._variables = create_mnist_variables()
@property
def trainable_variables(self):
return [self._variables.weights, self._variables.bias]
@property
def non_trainable_variables(self):
return []
@property
def local_variables(self):
return [
self._variables.num_examples, self._variables.loss_sum,
self._variables.accuracy_sum
]
@property
def input_spec(self):
return collections.OrderedDict([('x', tf.TensorSpec([None, 784],
tf.float32)),
('y', tf.TensorSpec([None, 1], tf.int32))])
@tf.function
def forward_pass(self, batch, training=True):
del training
loss, predictions = mnist_forward_pass(self._variables, batch)
return tff.learning.BatchOutput(loss=loss, predictions=predictions)
@tf.function
def report_local_outputs(self):
return get_local_mnist_metrics(self._variables)
@property
def federated_output_computation(self):
return aggregate_mnist_metrics_across_clients
本地训练模型的设定
class MnistTrainableModel(MnistModel, tff.learning.TrainableModel):
@tf.function
def train_on_batch(self, batch):
output = self.forward_pass(batch)
optimizer = tf.train.GradientDescentOptimizer(0.02)
optimizer.minimize(output.loss, var_list=self.trainable_variables)
return output
使用新模型进行联合训练
iterative_process = tff.learning.build_federated_averaging_process(
MnistTrainableModel)
state = iterative_process.initialize()
#@test {"timeout": 600, "output": "ignore"}
state, metrics = iterative_process.next(state, federated_train_data)
print('round 1, metrics={}'.format(metrics))
#@test {"skip": true}
for round_num in range(2, 11):
state, metrics = iterative_process.next(state, federated_train_data)
print('round {:2d}, metrics={}'.format(round_num, metrics))
评估
在联合数据集上进行评估需要需要使用构造器
tff.learning.build_federated_evaluation
,传入模型构造函数评估不需要梯度下降所以传入
MnistModel
已足够,不像联合平均那样需要传入一个可训练的模型MnistTrainableModel
在试验或者出于研究目的情况下,当中心化的测试数据集可用时,可将联合学习的模型参数应用到标准的Keras模型上,然后在中心化的数据集上直接调用
tf.keras.models.Model.evaluate()
来进行评估。
evaluation = tff.learning.build_federated_evaluation(MnistModel)
str(evaluation.type_signature)
#'(<<trainable=<weights=float32[784,10],bias=float32[10]>,non_trainable=<>>@SERVER,{<x=float32[?,784],y=int32[?,1]>*}@CLIENTS> -> <accuracy=float32@SERVER,loss=float32@SERVER,num_examples=float32@SERVER>)'
在最终状态调用评估函数,通过server状态来提取模型只需要简单的访问
.model
成员
#@test {"output": "ignore"}
train_metrics = evaluation(state.model, federated_train_data)
#@test {"output": "ignore"}
str(train_metrics)
#'<accuracy=0.465455,loss=1.47458,num_examples=2750.0>'
相关结论
一般来说在数据量足够的情况下,迁移学习效果不如完全重新训练,对于联合迁移学习来说,如果不是受限与计算能力和训练时长一般不会用到。另外Google并未提到过相关概念。我的理解是联合学习本身就是利用到了海量的数据集,并能在资源状态很好的情况下进行训练,从而能够达到一个全局最优模型。P.S. Q YANG也是炒个概念。(个人见解)
参考文献
https://www.tensorflow.org/federated
Last updated
Was this helpful?