Spawn Federated Example

October 19, 2022 ยท View on GitHub

Certain use cases require more complex and careful modeling of Actors. Here we exemplify the splitting and data aggregation scenario using Actors.

Targets:

Data Processing

Federated Machine Learning

Complex integration scenarios

Run

First up the Elixir application:

make run

NOTE: This example uses the MySQL database as persistent storage for its actors. And it is also expected that you have previously created a database called eigr-functions-db in the MySQL instance.

Second execute call for sending Task to Coordinator Actor:

iex(federated_01@127.0.0.1)1>list = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
iex(federated_01@127.0.0.1)4> task = %Federated.Domain.TaskRequest{
...(federated_01@127.0.0.1)4>      id: Uniq.UUID.uuid4(),
...(federated_01@127.0.0.1)4>      workers: 2,
...(federated_01@127.0.0.1)4>      task_strategy: :SUM,
...(federated_01@127.0.0.1)4>      aggregation_strategy: :SUM_TASKS,
...(federated_01@127.0.0.1)4>      data: %Federated.Domain.Data{numbers: list}
...(federated_01@127.0.0.1)4>    }

%Federated.Domain.TaskRequest{
  id: "15391a75-716d-4e2f-a9fa-3c9900d710b2",
  data: %Federated.Domain.Data{
    numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
     20],
    __unknown_fields__: []
  },
  workers: 2,
  __unknown_fields__: []
}
iex(federated_01@127.0.0.1)6> {:ok, task_id} = SpawnFederatedExample.Client.push(task)
{:ok, "86255146-8561-404a-b958-423d7c159ed2"}

In the logs you will see some messages like these:

2022-10-14 19:39:41.667 [federated_01@127.0.0.1]:[pid=<0.939.0> ]:[debug]:TaskCoordinator Received Aggregate Results to Join. Response: [%Federated.Domain.FederatedTaskResult{id: "331131c9-14f2-4a73-9d56-bc24c42f28f2", worker_id: "worker-0", data: %Federated.Domain.Result{data: 55, __unknown_fields__: []}, status: :DONE, __unknown_fields__: []}]

2022-10-14 19:39:41.669 [federated_01@127.0.0.1]:[pid=<0.939.0> ]:[debug]:TaskCoordinator Received Aggregate Results to Join. Response: [%Federated.Domain.FederatedTaskResult{id: "50bb96bb-886e-4656-b896-202dbaa2b279", worker_id: "worker-1", data: %Federated.Domain.Result{data: 155, __unknown_fields__: []}, status: :DONE, __unknown_fields__: []}]

This means that the Coordinator generated the subtasks, generated two workers worker-0 and worker-1 sent the subtasks to them and received the results back for aggregation.

Checking the results

Use fetch operation for get aggregated task result:

iex(federated_01@127.0.0.1)7>SpawnFederatedExample.Client.fetch(task_id)
{:ok,
 %Federated.Domain.Coordinator.Summary{
   task_id: "59cb2271-d88d-4a21-a240-d4a0b76f9020",
   sub_tasks: 2,
   response: %Federated.Domain.TaskResponse{
     id: "59cb2271-d88d-4a21-a240-d4a0b76f9020",
     result: %Federated.Domain.Result{data: 210, __unknown_fields__: []},
     __unknown_fields__: []
   },
   status: :DONE,
   __unknown_fields__: []
 }}
iex(federated_01@127.0.0.1)8>