[컴] python Ray library

python Ray library

actor model 을 쉽게 만들 수 있게 해주는 library, actor model 에 대해서는 see also 1. 을 참고하자.

pip install -U ray
import ray

ray.init()


# Define the square task.
@ray.remote
def square(x):
    return x * x

# Launch four parallel square tasks.
futures = [square.remote(i) for i in range(4)]

# Retrieve results.
print(ray.get(futures))
# -> [0, 1, 4, 9]

흐름

작성중…

ProgressAction 를 만들고, 일단 samling_task 를 remote 로 실행한다.(아마 다른 thread 에서 실행하는 듯)

ray.get()을 이용해서 progress_actor.get_progress remote로 실행한다. ray.get()는 block 을 시켜준다. 그리고 time.sleep 을 하게 되면,

seq. diagram
import ray
import math
import time
import random

ray.init()


# # Define the square task.
# @ray.remote
# def square(x):
#     return x * x

# # Launch four parallel square tasks.
# futures = [square.remote(i) for i in range(4)]

# # Retrieve results.
# print(ray.get(futures))
# # -> [0, 1, 4, 9]


@ray.remote
class ProgressActor:
    def __init__(self, total_num_samples: int):
        self.total_num_samples = total_num_samples
        self.num_samples_completed_per_task = {}

    def report_progress(self, task_id: int, num_samples_completed: int) -> None:
        self.num_samples_completed_per_task[task_id] = num_samples_completed

    def get_progress(self) -> float:
        return (
            sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
        )

@ray.remote
def sampling_task(num_samples: int, task_id: int,
                  progress_actor: ray.actor.ActorHandle) -> int:
    num_inside = 0
    for i in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1

        # Report progress every 1 million samples.
        if (i + 1) % 1_000_000 == 0:
            # This is async.
            progress_actor.report_progress.remote(task_id, i + 1)

    # Report the final progress.
    progress_actor.report_progress.remote(task_id, num_samples)
    return num_inside

# Change this to match your cluster scale.
NUM_SAMPLING_TASKS = 10
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK

# Create the progress actor.
progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES)

# Create and execute all sampling tasks in parallel.
results = [
    sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor)
    for i in range(NUM_SAMPLING_TASKS)
]


# Query progress periodically.
while True:
    progress = ray.get(progress_actor.get_progress.remote())
    print(f"Progress: {int(progress * 100)}%")

    if progress == 1:
        break

    time.sleep(1)


# Get all the sampling tasks results.
total_num_inside = sum(ray.get(results))
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
print(f"Estimated value of π is: {pi}")

See Also

  1. Data Parallel, Task Parallel, and Agent Actor Architectures – bytewax
  2. Data Parallel, Task Parallel, and Agent Actor Architectures | Hacker News

댓글 없음:

댓글 쓰기