閱讀599 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Fork and Join: Java也可以輕鬆地編寫並發程序

資源下載:

Java SE 7

Sample Code(Zip)

如今,多核處理器在服務器,台式機及筆記本電腦上已經很普遍了,同時也被應用在更小的設備上,比如智能手機和平板電腦。這就開啟了並發編程新的潛力,因為多個線程可以在多個內核上並發執行。在應用中要實現最大性能的一個重要技術手段是將密集的任務分隔成多個可以並行執行的塊,以便可以最大化利用計算能力。

處理並發(並行)程序,一向都是比較困難的,因為你必須處理線程同步和共享數據的問題。對於java平台在語言級別上對並發編程的支持就很強大,這已經在Groovy(GPars), Scala和Clojure的社區的努力下得以證明。這些社區都盡量提供全麵的編程模型和有效的實現來掩飾多線程和分布式應用帶來的痛苦。Java語言本身在這方麵不應該被認為是不行的。Java平台標準版(Java SE) 5 ,和Java SE 6引入了一組包提供強大的並發模塊。Java SE 7中通過加入了對並行支持又進一步增強它們。

接下來的文章將以Java中一個簡短的並發程序作為開始,以一個在早期版本中存在的底層機製開始。在展示由Java SE7中的fork/join框架提供的fork/join任務之前,將看到java.util.concurrent包提供的豐富的原語操作。然後就是使用新API的例子。最後,將對上麵總結的方法進行討論。

在下文中,我們假定讀者具有Java SE5或Java SE6的背景,我們會一路呈現一些Java SE7帶來的一些實用的語言演變。

Java中普通線程的並發編程

首先從曆史上來看,java並發編程中通過java.lang.Thread類和java.lang.Runnable接口來編寫多線程程序,然後確保代碼對於共享的可變對象表現出的正確性和一致性,並且避免不正確的讀/寫操作,同時不會由於競爭條件上的鎖爭用而產生死鎖。這裏是一個基本的線程操作的例子:

    Thread thread = new Thread() {
         @Override
         public void run() {
              System.out.println("I am running in a separate thread!");
         }
    };
    thread.start();
    thread.join();

例子中的代碼創建了一個線程,並且打印一個字符串到標準輸出。通過調用join()方法,主線程將等待創建的(子)線程執行完成。
對於簡單的例子,直接操作線程這種方式是可以的,但對於並發編程,這樣的代碼很快變得容易出錯,特別是好幾個線程需要協作來完成一個更大的任務的時候。這種情況下,它們的控製流需要被協調。

例如,一個線程的執行完成可能依賴於其他將要執行完成的線程。通常熟悉的例子就是生產者/消費者的例子,因為如果消費者隊列是空的,那麼生產者應該等待消費者,並且如果生產者隊列是空的,那麼消費者也應該等待生產者。該需求可能通過共享狀態和條件隊列來實現,但是你仍然必須通過使用共享對象上的java.lang.Object.nofity()和java.lang.Object.wait()來實現同步,這很容易出錯。

最終,一個常見的錯誤就是在大段代碼甚至整個方法上使用synchronize進行互斥。雖然這種方法能實現線程安全的代碼,但是通常由於排斥時間太長而限製了並行性,從而造成性能低下。

在通常的計算過程中,操作低級原語來實現複雜的操作,這是對錯誤敞開大門。因此,開發者應該尋求有效地封裝複雜性為更高級的庫。Java SE5提供了那樣的能力。

java.util.concurrent包中豐富的原語

Java SE5引入了一個叫java.util.concurrent的包家族,在Java SE6中得到進一步增強。該包家族提供了下麵這些並發編程的原語,集合以及特性:

  • Executors,增強了普通的線程,因為它們(線程)從線程池管理中被抽象出來。它們執行任務類似於傳遞線程(實際上,是實現了java.util.Runnable的實例被封裝了)。好幾種實現都提供了線程池和調度策略。而且,執行結果既可以同步也可以異步的方式來獲取。
  • 線程安全的隊列允許在並發任務中傳遞數據。一組豐富的實現通過基本的數據結構(如數組鏈表,鏈接鏈表,或雙端隊列)和並發行為(如阻塞,支持優先級,或延遲)得以提供。
  • 細粒度的超時延遲規範,因為大部分java.util.concurrent包中的類都支持超時延遲。比如一個任務如果沒有在有限之間內完成,就會被executor中斷。
  • 豐富的同步模式超越了java提供的互斥同步塊。這些同步模式包含了常見的俗語,如信號量或同步柵欄。
  • 高效的並發數據集合(maps, lists和sets)通過寫時複製和細粒度鎖的使用,使得在多線程上下文中表現出卓越的性能。
  • 原子變量屏蔽開發者訪問它們時執行同步操作。這些變量包裝了通用的基本類型,比如Integers或Booleans,和對象引用。
  • 大量鎖超越了內部鎖提供的加鎖/通知功能,比如,支持重入,讀寫鎖,超時,或者基於輪詢的加鎖嚐試。

作為一個例子,讓我們想想下麵的程序:

注意:由於Java SE7引入了新的整數字麵值,下劃線可以在任何地方插入以提高可讀性(比如,1_000_000)。

import java.util.*;
import java.util.concurrent.*;
import static java.util.Arrays.asList;

public class Sums {

  static class Sum implements Callable<Long> {
     private final long from;
     private final long to;
     Sum(long from, long to) {
         this.from = from;
         this.to = to;
     }

     @Override
     public Long call() {
         long acc = 0;
         for (long i = from; i <= to; i++) {
             acc = acc + i;
         }
         return acc;
     }
  }

  public static void main(String[] args) throws Exception {
     ExecutorService executor = Executors.newFixedThreadPool(2);
     List <Future<Long>> results = executor.invokeAll(asList(
         new Sum(0, 10), new Sum(100, 1_000), new Sum(10_000, 1_000_000)
     ));
     executor.shutdown();

     for (Future<Long> result : results) {
         System.out.println(result.get());
     }
  }
}

這個例子程序利用executor來計算長整形數值的和。內部的Sum類實現了Callable接口,並被excutors用來執行結果計算,而並發工作則放在call方法中執行。java.util.concurrent.Executors類提供了好幾個工具方法,比如提供預先配置的Executors和包裝普通的java.util.Runnable對象為Callable實例。使用Callable比Runnable更優勢的地方在於Callable可以有確切的返回值。

該例子使用executor分發工作給2個線程。ExecutorService.invokeAll()方法放入Callable實例的集合,並且等待直到它們都返回。其返回Future對象列表,代表了計算的“未來”結果。如果我們想以異步的方式執行,我們可以檢測每個Future對象對應的Callable是否完成了它的工作和是否拋出了異常,甚至我們可以取消它。相比當使用普通的線程時,你必須通過一個共享可變的布爾值來編碼取消邏輯,並且通過定期檢查該布爾值來破壞該代碼。因為invokeAll()是阻塞的,我們可以直接迭代Future實例來獲取它們的計算和。

另外要注意executor服務必須被關閉。如果它沒有被關閉,主方法執行完後JVM就不會退出,因為仍然有激活線程存在。

Fork/Join 任務

概覽

Executors相對於普通的線程已經是一個很大的進步,因為executors很容易管理並發任務。有些類型的算法存在需要創建子任務,並且讓它們彼此通信來完成任務。這些都是”分而治之”的算法,也被稱為”map and reduce”,這是參考函數式編程的同名函數。想法是將數據區通過算法處理分隔為更小切獨立的塊,這是”map”階段。反過來,一旦這些塊被處理完成了,各部分的結果就可以收集起來形成最終的結果,這就是”reduce”階段。

一個簡單的例子想要計算出一個龐大的整形數組的和(如圖1)。由於加法是可交換的,可以拆分數組分更小的部分,並且用並發線程計算各部分和。各部分和可以被加來從而計算出總和。因為線程可以獨立對一個數組的不同區域使用這種算法操作。相比於單線程算法(迭代數組中每個整形),你將看到在多核架構中有了明顯的性能提升。

 

圖1:整形數組中的部分和

通過executors解決上麵的問題是很容易的:將數組分為n(可用的物理處理單元)部分,創建Callable實例來計算每一部分的和,提交它們到一個管理了n個線程的池中,並且收集結果計算出最終結果。

然而,對其他類型的算法和數據結構,其執行計劃並不是那麼簡單。特別是,識別出要以有效的方式被獨立處理的“足夠小”的數據塊的”map”階段並不能提前知道到數據空間的拓撲結構。基於圖和基於樹的數據結構尤為如此。在這些情況下,算法應該創建層級”劃分”,即在部分結果返回之前等待子任務完成,雖然在像圖1中的數組性能較差,但有好幾個並發部分和的計算的級別可以使用(比如,在雙核處理器上將數組分為4個子任務)。

為了實現分而治之算法的executors的問題是創建不相關的子任務,因為一個Callable是無限製的提交一個新的子任務給它的executors,並且以同步或異步的方式等待它的結果。問題是並行:當一個Callable等待另一個Callable的結果時,它就處於等待狀態,從而浪費了一個機會來處理隊列中等待執行的另一個Callable。

通過Doug Lea努力填補了這一缺陷,在Java SE7中,fork/join框架被加到了java.util.concurrent包中。java.util.concurrent的Java SE5和Java SE6版本幫助處理並發,並且Java SE7的添加則幫助處理並行。

添加支持並行

核心的添加是新的ForkJoinPool執行者,專門執行實現了ForkJoinTask接口的實例。ForkJoinTask對象支持創建子任務來等待子任務完成。有了這些清晰的語義,當一個任務正在等待另一個任務完成並且有待執行的任務時,executor就能夠通過”偷取”任務,在內部的線程池裏分發任務。

ForkJoinTask對象主要有兩個重要的方法:

  • fork()方法允許ForkJoinTask任務異步執行,也允許一個新的ForkJoinTask從存在的ForkJoinTask中被啟動。
  • 反過來, join()方法允許一個ForkJoinTask等待另一個ForkJoinTask執行完成。

如圖2所示,通過fork()和join()實現任務間的相互合作。注意fork()和join()方法名稱不應該與POSIX中的進程能夠複製自己的過程相混淆。fork()隻會讓ForkJoinPool調度一個新的任務,而不會創建子虛擬機。

 

圖2:Fork和Join任務間的協作

有兩種類型的ForkJoinTask的定義:

  • RecursiveAction的實例代表執行沒有返回結果。
  • 相反,RecursiveTask會有返回值。

通常,RecursiveTask是首選的,因為大部分分而治之的算法會在數據集上計算後返回結果。對於任務的執行,不同的同步和異步選項是可選的,這樣就可以實現複雜的模式。

例子:計算文檔中的單詞出現次數

為了闡述新的fork/join框架的使用,讓我們用一個簡單的例子(計算一個單詞在文檔集中的出現次數)。首先,也是最重要的,fork/join任務應該是純內存算法,而沒有I/O操作。此外,應該盡可能避免通過共享狀態來進行任務間的通信,因為這通常意味著加鎖會被執行。理想情況下,僅當一個任務fork另一個任務或一個任務join另一個任務時才進行任務通信。

我們的應用操作一個文件目錄結構並且加載每一個文件的內容到內存中。因此,我們需要下麵的類來表示模型。文檔表示為一些列行:

class Document {
    private final List<String> lines;

    Document(List<String> lines) {
        this.lines = lines;
    }

    List<String> getLines() {
        return this.lines;
    }

    static Document fromFile(File file) throws IOException {
        List<String> lines = new LinkedList<>();
        try(BufferedReader reader = new BufferedReader(new FileReader(file))) {
            String line = reader.readLine();
            while (line != null) {
                lines.add(line);
                line = reader.readLine();
            }
        }
        return new Document(lines);
    }
}

注意:如果你對Java SE7比較陌生,你應該會對fromFlie方法中的亮點感到驚訝:

  • LinkedList使用鑽石語法(<>)讓編譯器推斷出範型參數類型。因為lines是List<String>類型,所以LinkedList<>被擴展為LinkedList<String>。鑽石操作符使得範型處理更容易,其避免了重複類型,因為這些類型在編譯時就能被輕易的推斷出來。
  • try塊使用了自動資源管理的語言特性。任何實現了java.lang.AutoClosable的類都可以在try塊中打開。而不管是否有異常拋出,任何在try塊中聲明的資源將會在執行離開try塊時合理地關閉。在Java SE7之前,正確地關閉資源很快變成嵌套的if/try/catch/finally塊的一張噩夢,而且經常很難寫正確。

一個文件夾時一個簡單的基於樹的結構:

class Folder {
    private final List<Folder> subFolders;
    private final List<Document> documents;

    Folder(List<Folder> subFolders, List<Document> documents) {
        this.subFolders = subFolders;
        this.documents = documents;
    }

    List<Folder> getSubFolders() {
        return this.subFolders;
    }

    List<Document> getDocuments() {
        return this.documents;
    }

    static Folder fromDirectory(File dir) throws IOException {
        List<Document> documents = new LinkedList<>();
        List<Folder> subFolders = new LinkedList<>();
        for (File entry : dir.listFiles()) {
            if (entry.isDirectory()) {
                subFolders.add(Folder.fromDirectory(entry));
            } else {
                documents.add(Document.fromFile(entry));
            }
        }
        return new Folder(subFolders, documents);
    }
}

現在我們可以開始我們的主類了:

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class WordCounter {

    String[] wordsIn(String line) {
        return line.trim().split("(\\s|\\p{Punct})+");
    }

    Long occurrencesCount(Document document, String searchedWord) {
        long count = 0;
        for (String line : document.getLines()) {
            for (String word : wordsIn(line)) {
                if (searchedWord.equals(word)) {
                    count = count + 1;
                }
            }
        }
        return count;
    }
}

occurrencesCount方法返回一個單詞在文檔中的出現次數,利用wordIn方法產生一行內的單詞組,它會基於空格或標點符號來分割每一行。
我們將實現兩種類型的fork/join任務。一個文件夾下的單詞出現次數就是該單詞在該文件夾下的所有的子文件夾和文檔中出現次數的總和。因此,我們將用一個任務計數在文檔中的出現次數和用另一個任務在文件夾下的計數,後者將forks子任務,然後將這些任務join起來,集合他們的結果。

依賴的任務關係很容易理解,如圖3所示,因為它直接映射底層文檔或文件夾樹結構。fork/join框架通過在等待一個任務執行文檔或文件夾單詞計數時可以通過join()同時執行一個文件夾任務,實現了並行最大化。

圖3:Fork/Join單詞計數任務

讓我們以DocumentSearchTask任務開始,它將計算一個文檔中單詞的出現次數:

class DocumentSearchTask extends RecursiveTask<Long> {
    private final Document document;
    private final String searchedWord;

    DocumentSearchTask(Document document, String searchedWord) {
        super();
        this.document = document;
        this.searchedWord = searchedWord;
    }

    @Override
    protected Long compute() {
        return occurrencesCount(document, searchedWord);
    }
}

因為我們的任務需要返回值,因此它們擴展自RecursiveTask類,由於出現次數用long值表示,所以用Long作為範型參數。compute()方法是RecursiveTask的核心,這裏的實現就簡單的委派給上麵的occurencesCount()方法。現在我們可以實現FolderSearchTask,該任務將對樹結構中的文件夾進行操作:

class FolderSearchTask extends RecursiveTask<Long> {
    private final Folder folder;
    private final String searchedWord;
    
    FolderSearchTask(Folder folder, String searchedWord) {
        super();
        this.folder = folder;
        this.searchedWord = searchedWord;
    }
    
    @Override
    protected Long compute() {
        long count = 0L;
        List<RecursiveTask<Long>> forks = new LinkedList<>();
        for (Folder subFolder : folder.getSubFolders()) {
            FolderSearchTask task = new FolderSearchTask(subFolder, searchedWord);
            forks.add(task);
            task.fork();
        }
        for (Document document : folder.getDocuments()) {
            DocumentSearchTask task = new DocumentSearchTask(document, searchedWord);
            forks.add(task);
            task.fork();
        }
        for (RecursiveTask<Long> task : forks) {
            count = count + task.join();
        }
        return count;
    }
}

該任務的compute()方法的實現簡單地對構造函數中傳遞的文件夾的每個元素fork出新的文檔或文件夾任務,然後join所有的計算出的部分和並返回部分和。

對於fork/join框架,我們現在缺少一個方法來引導單詞 計數操作和一個fork/join池執行者:

private final ForkJoinPool forkJoinPool = new ForkJoinPool();
Long countOccurrencesInParallel(Folder folder, String searchedWord) {
    return forkJoinPool.invoke(new FolderSearchTask(folder, searchedWord));
}

一個初始的FolderSearchTask引導了所有任務。ForkJoinPool的invoke方法允許等待計算的完成。在上麵的例子中,使用了ForkJoinPool的空構造函數,並行性將匹配硬件可用的處理器單元數(比如,在雙核處理器上該值為2)。

現在我們可以寫main()方法,通過命令行參數來獲得要操作的文件夾和搜索的單詞:

public static void main(String[] args) throws IOException {
    WordCounter wordCounter = new WordCounter();
    Folder folder = Folder.fromDirectory(new File(args[0]));
    System.out.println(wordCounter.countOccurrencesOnSingleThread(folder, args[1]));
}

此示例的完整源代碼還包括更傳統的,基於遞歸實現的相同算法,並運行在一個運行在單線程上:

Long countOccurrencesOnSingleThread(Folder folder, String searchedWord) {
    long count = 0;
    for (Folder subFolder : folder.getSubFolders()) {
        count = count + countOccurrencesOnSingleThread(subFolder, searchedWord);
    }
    for (Document document : folder.getDocuments()) {
        count = count + occurrencesCount(document, searchedWord);
    }
    return count;
}

討論

在來自Oracle的Sun Fire T2000服務器上進行了一個非正式的測試,該服務器可以規定Java虛擬機可用的處理器核數。上述的fork/join和單線程實例的不同版本都運行來查找JDK源代碼文件中import的出現次數。

這些不同版本都會運行好幾次,以確保Java虛擬機熱點優化有足夠的時間來執行。對於2, 4, 8和12核最佳的執行時間被收集了起來並且在加速,那就是說,這些比值(單線程耗時/fork-join耗時)被計算出來了。圖4核表1反應了其結果。

如你所見,隨著處理器核數極小的增長就實現了近似線性地加速,因為fork/join框架關心的是最大化並行。

表1:非正式的測試執行時間和加速

Number of Cores Single-Thread Execution Time (ms) Fork/Join Execution Time (ms) Speedup
2 18798 11026 1.704879376
4 19473 8329 2.337975747
8 18911 4208 4.494058935
12 19410 2876 6.748956885

圖4:隨著核數(水平軸)增長而加速(垂直軸)

我們也可以通過讓fork任務不操作在文檔級別,而是行級別來改進計算能力。這使得並發任務可能在同一文檔的不同行執行。然而這將是牽強的。事實上,fork/join任務應該執行足夠的計算量,以克服fork/join線程池或任務管理開銷。行級別的操作很瑣碎,反而影響方法的效率。

附帶的源代碼還有一個基於整型數組的歸並算法fork/join例子,有趣的是它使用RecursiveAction來實現的。fork/join任務在調用join()時不會返回值。而是,這些任務共享可變狀態:待排序的數組。實驗再次 表明隨內核數量增長,將實現近線性地加速。

總結

本文討論了Java中並發編程並強烈關注Java SE 7中所提供的新的fork / join任務,使得編寫並行程序更容易。這篇文章顯示,利用多核處理器,使用豐富的原語並組合它們編寫出高性能的程序,所有這些都無需處理線程的低級別操作和共享狀態同步。這篇文章通過一個既有吸引力又容易掌握的單詞計數例子闡釋了那些新APIs的使用,在非正式的測試中,隨處理器核數增長,獲得了接近線性的加速比。這些結果表麵fork/join框架是多麼有用。因為我們沒又更改代碼或調整Java虛擬機硬件的最大化核心利用率。

你也可以應用該技術到你的問題和數據模型。隻要你以”分而治之”的方式重寫你的算法來釋放I/O操作和鎖,你將看到明顯的變化。

鳴謝

作者要感謝Brian Goetz和邁克Duigou在早期的文章中有用的反饋修正。他還要感謝Scott Oaks and Alexis Moussine-Pouchkine,在合適的硬件上運行這些測試。

參考

JavSE Downloads: http://www.oracle.com/technetwork/java/javase/downloads/index.html
Sample Code: http://www.oracle.com/technetwork/articles/java/forkjoinsources-430155.zip
Java SE 7 API: http://download.java.net/jdk7/docs/api/
JSR-166 Interest Site by Doug Lea: http://gee.cs.oswego.edu/dl/concurrency-interest/
Project Coin: http://openjdk.java.net/projects/coin/
Java Concurrency in Practice by Brian Goetz, Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes and Doug Lea (Addision-Wesley Professional): 
http://www.informit.com/store/product.aspx?isbn=0321349601

Merge-sort algorithm: http://en.wikipedia.org/wiki/Merge_sort
Groovy: http://groovy.codehaus.org/
GPars: http://gpars.codehaus.org/
Scala: http://scala-lang.org
Clojure: http://clojure.org/

 

最後更新:2017-05-23 17:32:23

  上一篇:go  Java 7: 全麵教程-第一章節: Java初體驗
  下一篇:go  監聽器-java同步的基本思想