一、CPU密集型任務(wù)開(kāi)發(fā)指導(dǎo)
CPU密集型任務(wù)是指需要占用系統(tǒng)資源處理大量計(jì)算能力的任務(wù),需要長(zhǎng)時(shí)間運(yùn)行,這段時(shí)間會(huì)阻塞線(xiàn)程其它事件的處理,不適宜放在主線(xiàn)程進(jìn)行。例如圖像處理、視頻編碼、數(shù)據(jù)分析等。
基于多線(xiàn)程并發(fā)機(jī)制處理CPU密集型任務(wù)可以提高CPU利用率,提升應(yīng)用程序響應(yīng)速度。
當(dāng)進(jìn)行一系列同步任務(wù)時(shí),推薦使用Worker;而進(jìn)行大量或調(diào)度點(diǎn)較為分散的獨(dú)立任務(wù)時(shí),不方便使用8個(gè)Worker去做負(fù)載管理,推薦采用TaskPool。接下來(lái)將以圖像直方圖處理以及后臺(tái)長(zhǎng)時(shí)間的模型預(yù)測(cè)任務(wù)分別進(jìn)行舉例。
使用TaskPool進(jìn)行圖像直方圖處理
? 1. 實(shí)現(xiàn)圖像處理的業(yè)務(wù)邏輯。
? 2. 數(shù)據(jù)分段,將各段數(shù)據(jù)通過(guò)不同任務(wù)的執(zhí)行完成圖像處理。
創(chuàng)建Task,通過(guò)execute()執(zhí)行任務(wù),在當(dāng)前任務(wù)結(jié)束后,會(huì)將直方圖處理結(jié)果同時(shí)返回。
? 3. 結(jié)果數(shù)組匯總處理。
import taskpool from '@ohos.taskpool'; @Concurrent function imageProcessing(dataSlice: ArrayBuffer) { // 步驟1: 具體的圖像處理操作及其他耗時(shí)操作 return dataSlice; } function histogramStatistic(pixelBuffer: ArrayBuffer) { // 步驟2: 分成三段并發(fā)調(diào)度 let number = pixelBuffer.byteLength / 3; let buffer1 = pixelBuffer.slice(0, number); let buffer2 = pixelBuffer.slice(number, number * 2); let buffer3 = pixelBuffer.slice(number * 2); let task1 = new taskpool.Task(imageProcessing, buffer1); let task2 = new taskpool.Task(imageProcessing, buffer2); let task3 = new taskpool.Task(imageProcessing, buffer3); taskpool.execute(task1).then((ret: ArrayBuffer[]) => { // 步驟3: 結(jié)果處理 }); taskpool.execute(task2).then((ret: ArrayBuffer[]) => { // 步驟3: 結(jié)果處理 }); taskpool.execute(task3).then((ret: ArrayBuffer[]) => { // 步驟3: 結(jié)果處理 }); } @Entry @Component struct Index { @State message: string = 'Hello World' build() { Row() { Column() { Text(this.message) .fontSize(50) .fontWeight(FontWeight.Bold) .onClick(() => { let data: ArrayBuffer; histogramStatistic(data); }) } .width('100%') } .height('100%') } }
使用Worker進(jìn)行長(zhǎng)時(shí)間數(shù)據(jù)分析
本文通過(guò)某地區(qū)提供的房?jī)r(jià)數(shù)據(jù)訓(xùn)練一個(gè)簡(jiǎn)易的房?jī)r(jià)預(yù)測(cè)模型,該模型支持通過(guò)輸入房屋面積和房間數(shù)量去預(yù)測(cè)該區(qū)域的房?jī)r(jià),模型需要長(zhǎng)時(shí)間運(yùn)行,房?jī)r(jià)預(yù)測(cè)需要使用前面的模型運(yùn)行結(jié)果,因此需要使用Worker。
? 1. DevEco Studio提供了Worker創(chuàng)建的模板,新建一個(gè)Worker線(xiàn)程,例如命名為“MyWorker”。


? 2. 在主線(xiàn)程中通過(guò)調(diào)用ThreadWorker的constructor()方法創(chuàng)建Worker對(duì)象,當(dāng)前線(xiàn)程為宿主線(xiàn)程。
import worker from '@ohos.worker';
const workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
? 3. 在宿主線(xiàn)程中通過(guò)調(diào)用onmessage()方法接收Worker線(xiàn)程發(fā)送過(guò)來(lái)的消息,并通過(guò)調(diào)用postMessage()方法向Worker線(xiàn)程發(fā)送消息。
例如向Worker線(xiàn)程發(fā)送訓(xùn)練和預(yù)測(cè)的消息,同時(shí)接收Worker線(xiàn)程發(fā)送回來(lái)的消息。
// 接收Worker子線(xiàn)程的結(jié)果
workerInstance.onmessage = function(e) {
// data:主線(xiàn)程發(fā)送的信息
let data = e.data;
console.info('MyWorker.ts onmessage');
// 在Worker線(xiàn)程中進(jìn)行耗時(shí)操作
}
workerInstance.onerror = function (d) {
// 接收Worker子線(xiàn)程的錯(cuò)誤信息
}
// 向Worker子線(xiàn)程發(fā)送訓(xùn)練消息
workerInstance.postMessage({ 'type': 0 });
// 向Worker子線(xiàn)程發(fā)送預(yù)測(cè)消息
workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });
? 4. 在MyWorker.ts文件中綁定Worker對(duì)象,當(dāng)前線(xiàn)程為Worker線(xiàn)程。
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
? 5. 在Worker線(xiàn)程中通過(guò)調(diào)用onmessage()方法接收宿主線(xiàn)程發(fā)送的消息內(nèi)容,并通過(guò)調(diào)用postMessage()方法向宿主線(xiàn)程發(fā)送消息。
如在Worker線(xiàn)程中定義預(yù)測(cè)模型及其訓(xùn)練過(guò)程,同時(shí)與主線(xiàn)程進(jìn)行信息交互。
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
// 定義訓(xùn)練模型及結(jié)果
let result;
// 定義預(yù)測(cè)函數(shù)
function predict(x) {
return result[x];
}
// 定義優(yōu)化器訓(xùn)練過(guò)程
function optimize() {
result = {};
}
// Worker線(xiàn)程的onmessage邏輯
workerPort.onmessage = function (e: MessageEvents) {
let data = e.data
// 根據(jù)傳輸?shù)臄?shù)據(jù)的type選擇進(jìn)行操作
switch (data.type) {
case 0:
// 進(jìn)行訓(xùn)練
optimize();
// 訓(xùn)練之后發(fā)送主線(xiàn)程訓(xùn)練成功的消息
workerPort.postMessage({ type: 'message', value: 'train success.' });
break;
case 1:
// 執(zhí)行預(yù)測(cè)
const output = predict(data.value);
// 發(fā)送主線(xiàn)程預(yù)測(cè)的結(jié)果
workerPort.postMessage({ type: 'predict', value: output });
break;
default:
workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
break;
}
}
? 6. 在Worker線(xiàn)程中完成任務(wù)之后,執(zhí)行Worker線(xiàn)程銷(xiāo)毀操作。銷(xiāo)毀線(xiàn)程的方式主要有兩種:根據(jù)需要可以在宿主線(xiàn)程中對(duì)Worker線(xiàn)程進(jìn)行銷(xiāo)毀;也可以在Worker線(xiàn)程中主動(dòng)銷(xiāo)毀Worker線(xiàn)程。
在宿主線(xiàn)程中通過(guò)調(diào)用onexit()方法定義Worker線(xiàn)程銷(xiāo)毀后的處理邏輯。
// Worker線(xiàn)程銷(xiāo)毀后,執(zhí)行onexit回調(diào)方法
workerInstance.onexit = function() {
console.info("main thread terminate");
}
方式一:在宿主線(xiàn)程中通過(guò)調(diào)用terminate()方法銷(xiāo)毀Worker線(xiàn)程,并終止Worker接收消息。
// 銷(xiāo)毀Worker線(xiàn)程 workerInstance.terminate();
方式二:在Worker線(xiàn)程中通過(guò)調(diào)用close()方法主動(dòng)銷(xiāo)毀Worker線(xiàn)程,并終止Worker接收消息。
// 銷(xiāo)毀線(xiàn)程 workerPort.close();
二、 I/O密集型任務(wù)開(kāi)發(fā)指導(dǎo)
使用異步并發(fā)可以解決單次I/O任務(wù)阻塞的問(wèn)題,但是如果遇到I/O密集型任務(wù),同樣會(huì)阻塞線(xiàn)程中其它任務(wù)的執(zhí)行,這時(shí)需要使用多線(xiàn)程并發(fā)能力來(lái)進(jìn)行解決。
I/O密集型任務(wù)的性能重點(diǎn)通常不在于CPU的處理能力,而在于I/O操作的速度和效率。這種任務(wù)通常需要頻繁地進(jìn)行磁盤(pán)讀寫(xiě)、網(wǎng)絡(luò)通信等操作。此處以頻繁讀寫(xiě)系統(tǒng)文件來(lái)模擬I/O密集型并發(fā)任務(wù)的處理。
? 1. 定義并發(fā)函數(shù),內(nèi)部密集調(diào)用I/O能力。
import fs from '@ohos.file.fs';
// 定義并發(fā)函數(shù),內(nèi)部密集調(diào)用I/O能力
@Concurrent
async function concurrentTest(fileList: string[]) {
// 寫(xiě)入文件的實(shí)現(xiàn)
async function write(data, filePath) {
let file = await fs.open(filePath, fs.OpenMode.READ_WRITE);
await fs.write(file.fd, data);
fs.close(file);
}
// 循環(huán)寫(xiě)文件操作
for (let i = 0; i < fileList.length; i++) {
write('Hello World!', fileList[i]).then(() =?> {
console.info(`Succeeded in writing the file. FileList: ${fileList[i]}`);
}).catch((err) => {
console.error(`Failed to write the file. Code is ${err.code}, message is ${err.message}`)
return false;
})
}
return true;
}
? 2. 使用TaskPool執(zhí)行包含密集I/O的并發(fā)函數(shù):通過(guò)調(diào)用execute()方法執(zhí)行任務(wù),并在回調(diào)中進(jìn)行調(diào)度結(jié)果處理。示例中的filePath1和filePath2的獲取方式請(qǐng)參見(jiàn)獲取應(yīng)用文件路徑。
import taskpool from '@ohos.taskpool';
let filePath1 = ...; // 應(yīng)用文件路徑
let filePath2 = ...;
// 使用TaskPool執(zhí)行包含密集I/O的并發(fā)函數(shù)
// 數(shù)組較大時(shí),I/O密集型任務(wù)任務(wù)分發(fā)也會(huì)搶占主線(xiàn)程,需要使用多線(xiàn)程能力
taskpool.execute(concurrentTest, [filePath1, filePath2]).then((ret) => {
// 調(diào)度結(jié)果處理
console.info(`The result: ${ret}`);
})
審核編輯 黃宇
-
cpu
+關(guān)注
關(guān)注
68文章
11277瀏覽量
224954 -
圖像處理
+關(guān)注
關(guān)注
29文章
1342瀏覽量
59507 -
線(xiàn)程
+關(guān)注
關(guān)注
0文章
509瀏覽量
20825 -
HarmonyOS
+關(guān)注
關(guān)注
80文章
2153瀏覽量
36041
發(fā)布評(píng)論請(qǐng)先 登錄
鴻蒙OS開(kāi)發(fā)實(shí)例:【ArkTS類(lèi)庫(kù)多線(xiàn)程CPU密集型任務(wù)TaskPool】
鴻蒙原生應(yīng)用開(kāi)發(fā)-ArkTS語(yǔ)言基礎(chǔ)類(lèi)庫(kù)多線(xiàn)程CPU密集型任務(wù)TaskPool
鴻蒙原生應(yīng)用開(kāi)發(fā)-ArkTS語(yǔ)言基礎(chǔ)類(lèi)庫(kù)多線(xiàn)程I/O密集型任務(wù)開(kāi)發(fā)
請(qǐng)問(wèn)如何在Python中實(shí)現(xiàn)多線(xiàn)程與多進(jìn)程的協(xié)作?
CPU密集型任務(wù)開(kāi)發(fā)指導(dǎo)
計(jì)算密集型的程序簡(jiǎn)析
HarmonyOS如何使用異步并發(fā)能力進(jìn)行開(kāi)發(fā)
HarmonyOS CPU與I/O密集型任務(wù)開(kāi)發(fā)指導(dǎo)
HarmonyOS語(yǔ)言基礎(chǔ)類(lèi)庫(kù)開(kāi)發(fā)指南上線(xiàn)啦!
I/O密集型虛擬機(jī)的域間通信優(yōu)化方法
FPGA執(zhí)行計(jì)算密集型任務(wù)性能表現(xiàn)及優(yōu)勢(shì)有哪些
云優(yōu)化性能:使用基于閃存的存儲(chǔ)的I/O密集型工作負(fù)載
鴻蒙OS開(kāi)發(fā)實(shí)例:【ArkTS類(lèi)庫(kù)多線(xiàn)程I/O密集型任務(wù)開(kāi)發(fā)】
HarmonyOS CPU與I/O密集型任務(wù)開(kāi)發(fā)指導(dǎo)
評(píng)論