Operating duties in parallel – The.Swift.Dev.


With the ability to run duties in parallel is good, it may possibly velocity up issues for certain when you may make the most of a number of CPU cores, however how can we really implement these type of operations in Swift? 🤔

There are a number of methods of working parallel operations, I had an extended article concerning the Grand Central Dispatch (GCD) framework, there I defined the variations between parallelism and concurrency. I additionally demonstrated the way to arrange serial and concurrent dispatch queues, however this time I would wish to focus a bit extra on duties, staff and jobs.

Think about that you’ve got an image which is 50000 pixel broad and 20000 pixel lengthy, that is precisely one billion pixels. How would you alter the colour of every pixel? Properly, we might do that by iterating by means of every pixel and let one core do the job, or we might run duties in parallel.

The Dispatch framework presents a number of methods to unravel this situation. The primary answer is to make use of the concurrentPerform perform and specify some variety of staff. For the sake of simplicity, I’ll add up the numbers from zero to 1 billion utilizing 8 staff. 💪

import Dispatch

let staff: Int = 8
let numbers: [Int] = Array(repeating: 1, depend: 1_000_000_000)

var sum = 0
DispatchQueue.concurrentPerform(iterations: staff) { index in
    let begin = index * numbers.depend / staff
    let finish = (index + 1) * numbers.depend / staff
    print("Employee #(index), gadgets: (numbers[start..")

    sum += numbers[start..reduce(0, +)
}

print("Sum: (sum)")

Cool, but still each worker has to work on quite a lot of numbers, maybe we shouldn’t start all the workers at once, but use a pool and run only a subset of them at a time. This is quite an easy task with operation queues, let me show you a basic example. 😎

import Foundation

let workers: Int = 8
let numbers: [Int] = Array(repeating: 1, depend: 1_000_000_000)

let operationQueue = OperationQueue()
operationQueue.maxConcurrentOperationCount = 4

var sum = 0
for index in 0..let operation = BlockOperation {
        let begin = index * numbers.depend / staff
        let finish = (index + 1) * numbers.depend / staff
        print("Employee #(index), gadgets: (numbers[start..")
        
        sum += numbers[start..reduce(0, +)
    }
    operationQueue.addOperation(operation)
}

operationQueue.waitUntilAllOperationsAreFinished()

print("Sum: (sum)")

Both of the examples are above are more ore less good to go (if we look through at possible data race & synchronization), but they depend on additional frameworks. In other words they are non-native Swift solutions. What if we could do something better using structured concurrency?

let workers: Int = 8
let numbers: [Int] = Array(repeating: 1, depend: 1_000_000_000)

let sum = await withTaskGroup(of: Int.self) { group in
    for i in 0..addTask {
            let begin = i * numbers.depend / staff
            let finish = (i + 1) * numbers.depend / staff
            return numbers[start..reduce(0, +)
        }
    }

    var summary = 0
    for await result in group {
        summary += result
    }
    return summary
}

print("Sum: (sum)")

By using task groups you can easily setup the workers and run them in parallel by adding a task to the group. Then you can wait for the partial sum results to arrive and sum everything up using a thread-safe solution. This approach is great, but is it possible to limit the maximum number of concurrent operations, just like we did with operation queues? 🤷‍♂️

func parallelTasks(
    iterations: Int,
    concurrency: Int,
    block: @escaping ((Int) async throws -> T)
) async throws -> [T] {
    attempt await withThrowingTaskGroup(of: T.self) { group in
        var outcome: [T] = []

        for i in 0..if i >= concurrency {
                if let res = attempt await group.subsequent() {
                    outcome.append(res)
                }
            }
            group.addTask {
                attempt await block(i)
            }
        }

        for attempt await res in group {
            outcome.append(res)
        }
        return outcome
    }
}


let staff: Int = 8
let numbers: [Int] = Array(repeating: 1, depend: 1_000_000_000)

let res = attempt await parallelTasks(
    iterations: staff,
    concurrency: 4
) { i in
    print(i)
    let begin = i * numbers.depend / staff
    let finish = (i + 1) * numbers.depend / staff
    return numbers[start..reduce(0, +)
}

print("Sum: (res.reduce(0, +))")

It is possible, I made a little helper function similar to the concurrentPerform method, this way you can execute a number of tasks and limit the level of concurrency. The main idea is to run a number of iterations and when the index reaches the maximum number of concurrent items you wait until a work item finishes and then you add a new task to the group. Before you finish the task you also have to await all the remaining results and append those results to the grouped result array. 😊

That’s it for now, I hope this little article will help you to manage concurrent operations a bit better.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles