Tensorflow Federated


By Junyangz AT IIE, CAS.

Background

  • Much of the data is born decentralized(phones & IoT devices)

  • easy learn if data is in one place

  • Centralization has disadvantages

    • user experience

      • latency

      • offline

    • resource limits

      • data caps

      • battery life

    • privacy concerns

      • sensitive data

  • Learn without collect data on-device

    • often too little data per client

    • other clients aren't contributing

    • pre-training may help... sometimes

  • Learn together --- Federated Learning

Overview 总览

TensorFlow Federated (TFF) is an open-source framework for machine learning and other computations on decentralized data.

TensorFlow Federated (TFF) is an open-source framework for machine learning and other computations on decentralized data. TFF has been developed to facilitate open research and experimentation with Federated Learning (FL), an approach to machine learning where a shared global model is trained across many participating clients that keep their training data locally. For example, FL has been used to train prediction models for mobile keyboards without uploading sensitive typing data to servers.

TensorFlow 联合(TFF)是一个开源框架,用于在分布式数据上进行机器学习和其他计算。开发TFF是为了促进联合学习的开放式研究和实验,联合学习是一种机器学习方法,在这种方法中,在许多参与的客户之间训练共享的全局模型,这些客户将他们的训练数据保存在本地。例如,FL已被用于训练移动键盘的预测模型,而无需将敏感的打字数据上传到服务器。

TFF enables developers to simulate the included federated learning algorithms on their models and data, as well as to experiment with novel algorithms. The building blocks provided by TFF can also be used to implement non-learning computations, such as aggregated analytics over decentralized data. TFF’s interfaces are organized in two layers:

TFF使开发人员能够在他们的模型和数据上模拟联合学习算法,并尝试新的算法。TFF提供的构建模块也可用于实现非学习计算,例如对分散数据的聚合分析。TFF的接口分为两层:

Federated Learning (FL) API This layer offers a set of high-level interfaces that allow developers to apply the included implementations of federated training and evaluation to their existing TensorFlow models.

该层提供了一组高级接口,允许开发人员将所包含的联合训练和评估实现应用于现有的TensorFlow模型

Federated Core (FC) API At the core of the system is a set of lower-level interfaces for concisely expressing novel federated algorithms by combining TensorFlow with distributed communication operators within a strongly-typed functional programming environment. This layer also serves as the foundation upon which we've built Federated Learning.

该系统的核心是一组低级接口,用于通过在强类型函数编程环境中结合TensorFlow 和分布式通信操作符来简洁地表达新的联合算法。这一层也是我们建立联合学习的基础。

  • Federated Learning (FL) API layer tff.learning

一组更高级别的接口,可用于执行常见类型的联合学习任务,例如联合训练,针对已在TensorFlow中实现的用户提供的模型。

准备输入数据

  • TFF仓库里自带了一些数据集,包括CMU制作的针对联合学习benchmark的数据集Leaf,由于每个作者都有一个独特的风格,这个数据集展示了非i.i.d的类型。这也是联合数据集的预期情况。

#@test {"output": "ignore"}
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
emnist_train.output_types, emnist_train.output_shapes
#(OrderedDict([(u'label', tf.int32), (u'pixels', tf.float32)]), OrderedDict([(u'label', TensorShape([])), (u'pixels', TensorShape([28, 28]))]))
example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])

example_element = iter(example_dataset).next()

example_element['label'].numpy()
  • 将MNIST数据集中的28x28图像压缩成784个元素数组

  • 数据已经是tf.data.Dataset,因此可以使用数据集转换完成预处理。

  • 将各个示例混合,将它们组织成批,然后将像素和标签的特征重命名为x和y,以便与Keras一起使用。 我们还重复数据集以运行多个周期。

NUM_EPOCHS = 10
BATCH_SIZE = 20
SHUFFLE_BUFFER = 500


def preprocess(dataset):

  def element_fn(element):
    return collections.OrderedDict([
        ('x', tf.reshape(element['pixels'], [-1])),
        ('y', tf.reshape(element['label'], [1])),
    ])

  return dataset.repeat(NUM_EPOCHS).map(element_fn).shuffle(
      SHUFFLE_BUFFER).batch(BATCH_SIZE)
def make_federated_data(client_data, client_ids):
  return [preprocess(client_data.create_tf_dataset_for_client(x))
          for x in client_ids]
#@test {"output": "ignore"}
NUM_CLIENTS = 3

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]

federated_train_data = make_federated_data(emnist_train, sample_clients)

使用Keras创建模型

  • 优化器和学习率不同于在i.i.d.数据集上的选择

def create_compiled_keras_model():
  model = tf.keras.models.Sequential([
      tf.keras.layers.Dense(
          10, activation=tf.nn.softmax, kernel_initializer='zeros', input_shape=(784,))])
 
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.02),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
  return model
  • 将任何模型应用于TFF,需要使用tff.learning.Model重新包装下

def model_fn():
  keras_model = create_compiled_keras_model()
  return tff.learning.from_compiled_keras_model(keras_model, sample_batch)

在联合数据集上训练模型

  • 联合平均算法

#@test {"output": "ignore"}
iterative_process = tff.learning.build_federated_averaging_process(model_fn)

TFF构建了一对联合计算并将它们打包成tff.utils.IterativeProcess,其中这些计算可作为一对初始化和下一个属性使用。

state = iterative_process.initialize()

调用initialize计算来构造服务器状态

联合计算中的第二个函数next表示单轮联合平均(Federated Averaging),其包括将服务器状态(包括模型参数)推送到客户端,对其本地数据进行设备上训练,收集和平均模型更新,并在服务器上生成新的更新模型。可以类比成以下过程

SERVER_STATE, FEDERATED_DATA -> SERVER_STATE, TRAINING_METRICS

next()不仅是一个在server上运行的函数,而是作为整个分布式计算的声明性函数表示,某些输入由server(SERVER_STATE)提供,但每个参与的设备都提供其自己的本地数据集。

#@test {"timeout": 600, "output": "ignore"}
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))

单轮训练

#@test {"skip": true}
for round_num in range(2, 11):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))

多轮训练


自定义训练参数

  • 定义一个数据结构namedtuple用于保存训练指标

MnistVariables = collections.namedtuple(
    'MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')
  • 创建mnist数据结构变量的函数

def create_mnist_variables():
  return MnistVariables(
      weights = tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
          name='weights',
          trainable=True),
      bias = tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name='bias',
          trainable=True),
      num_examples = tf.Variable(0.0, name='num_examples', trainable=False),
      loss_sum = tf.Variable(0.0, name='loss_sum', trainable=False),
      accuracy_sum = tf.Variable(0.0, name='accuracy_sum', trainable=False))
  • 通过模型参数和累积统计信息的变量,定义计算损失,执行预测和更新单批输入数据的累积统计数据的正向传递方法

def mnist_forward_pass(variables, batch):
  y = tf.nn.softmax(tf.matmul(batch['x'], variables.weights) + variables.bias)
  predictions = tf.cast(tf.argmax(y, 1), tf.int32)

  flat_labels = tf.reshape(batch['y'], [-1])
  loss = -tf.reduce_mean(tf.reduce_sum(
      tf.one_hot(flat_labels, 10) * tf.log(y), reduction_indices=[1]))
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))

  num_examples = tf.cast(tf.size(batch['y']), tf.float32)

  tf.assign_add(variables.num_examples, num_examples)
  tf.assign_add(variables.loss_sum, loss * num_examples)
  tf.assign_add(variables.accuracy_sum, accuracy * num_examples)

  return loss, predictions
  • 返回本地指标方法,需要正确加权来自不同用户的贡献

def get_local_mnist_metrics(variables):
  return collections.OrderedDict([
      ('num_examples', variables.num_examples),
      ('loss', variables.loss_sum / variables.num_examples),
      ('accuracy', variables.accuracy_sum / variables.num_examples)
    ])
  • aggregate_mnist_metrics_across_clients聚合本地指标数据的方法

  • 是一个TFF联合学习的计算表达式

  • 输入metrics是上一个方法get_local_mnist_metrics返回的,但值已不再是tf.Tensors类型,而是经过Boxed成为了tff.Value,改变量不能再使用TensorFlow来修改,只能用TFF的联合操作例如:tff.federated_meantff.federated_sum

  • 函数返回的字典结果定义了服务端可用的参数集合

@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return {
      'num_examples': tff.federated_sum(metrics.num_examples),
      'loss': tff.federated_mean(metrics.loss, metrics.num_examples),
      'accuracy': tff.federated_mean(metrics.accuracy, metrics.num_examples)
  }
  • 模型使用的所有状态必须为TensorFlow变量,因为TFF在运行时不使用Python

  • 模型应描述它接受的数据形式(input_spec),因为通常,TFF是一种强类型环境,并且希望确定所有组件的类型签名。 声明模型输入的格式是其中不可或缺的一部分

  • 虽然技术上不需要,但建议将所有TensorFlow逻辑(正向传递,度量计算等)包装为tf.functions,因为这有助于确保TensorFlow可以序列化,并且不需要显式的控制依赖

class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]

  @property
  def non_trainable_variables(self):
    return []

  @property
  def local_variables(self):
    return [
        self._variables.num_examples, self._variables.loss_sum,
        self._variables.accuracy_sum
    ]

  @property
  def input_spec(self):
    return collections.OrderedDict([('x', tf.TensorSpec([None, 784],
                                                        tf.float32)),
                                    ('y', tf.TensorSpec([None, 1], tf.int32))])

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    return tff.learning.BatchOutput(loss=loss, predictions=predictions)

  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)

  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients
  • 本地训练模型的设定

class MnistTrainableModel(MnistModel, tff.learning.TrainableModel):

  @tf.function
  def train_on_batch(self, batch):
    output = self.forward_pass(batch)
    optimizer = tf.train.GradientDescentOptimizer(0.02)
    optimizer.minimize(output.loss, var_list=self.trainable_variables)
    return output
  • 使用新模型进行联合训练

iterative_process = tff.learning.build_federated_averaging_process(
    MnistTrainableModel)
state = iterative_process.initialize()
#@test {"timeout": 600, "output": "ignore"}
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
#@test {"skip": true}
for round_num in range(2, 11):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))

评估

  • 在联合数据集上进行评估需要需要使用构造器tff.learning.build_federated_evaluation,传入模型构造函数

  • 评估不需要梯度下降所以传入MnistModel已足够,不像联合平均那样需要传入一个可训练的模型MnistTrainableModel

  • 在试验或者出于研究目的情况下,当中心化的测试数据集可用时,可将联合学习的模型参数应用到标准的Keras模型上,然后在中心化的数据集上直接调用tf.keras.models.Model.evaluate()来进行评估。

evaluation = tff.learning.build_federated_evaluation(MnistModel)
str(evaluation.type_signature)
#'(<<trainable=<weights=float32[784,10],bias=float32[10]>,non_trainable=<>>@SERVER,{<x=float32[?,784],y=int32[?,1]>*}@CLIENTS> -> <accuracy=float32@SERVER,loss=float32@SERVER,num_examples=float32@SERVER>)'
  • 在最终状态调用评估函数,通过server状态来提取模型只需要简单的访问.model成员

#@test {"output": "ignore"}
train_metrics = evaluation(state.model, federated_train_data)
#@test {"output": "ignore"}
str(train_metrics)
#'<accuracy=0.465455,loss=1.47458,num_examples=2750.0>'

相关结论

  • 一般来说在数据量足够的情况下,迁移学习效果不如完全重新训练,对于联合迁移学习来说,如果不是受限与计算能力和训练时长一般不会用到。另外Google并未提到过相关概念。我的理解是联合学习本身就是利用到了海量的数据集,并能在资源状态很好的情况下进行训练,从而能够达到一个全局最优模型。P.S. Q YANG也是炒个概念。(个人见解)

参考文献

Last updated