Yuchuan Weng.Miko Tech-Blog.

Java8与并发

2017/12/28 Share

自2014年Oracle发布Java8新版本后,对于Java来说这显然是一个具有里程碑意义的版本,它最主要的改进是增加了函数式编程,就目前而言最令人头疼的问题就是Java那繁琐的语法,不得不使我们要花费大量的代码行数去实现一些司空见惯的功能。这一切都在Java8函数式编程中。

Java8函数式编程简介

函数式编程与面向对象设计方法在思路和手段上各有千秋,在这里我们简要介绍一下函数式编程于面向对象相比较的一些特点和差异。

函数作为一等公民

JS是我们常见的语言,我们可以把它称之为多范式语言,既可以作为不是很严格的面向对象语言,也可以当做函数式语言。一个显著特点是函数可以作为参数传入,另一个显著特点是函数可以作为另一个函数的返回值。

无副作用

函数的副作用指的是函数在调用过程中除了给出返回值以外还修改了函数外部状态。比如函数在调用过程中修改了某一全局状态,函数式编程认为函数的副作用应该尽量被避免。因为当一个变量被改变后我们很难想象是哪一个函数修改的。如果函数都是显式函数,那么函数必然不会被外部或者全局信息影响,对调查和排错是非常有益的。
注意:显示函数是指与外界交换数据的唯一渠道就是参数和返回值,显示函数不会去读取或者修改函数的外部状态 与之相对的是隐式函数,隐式函数除了参数和返回值外还会读取外部信息或者修改外部信息。
(一般编程语言中都允许副作用的存在)这种函数调用的副作用咋子函数式编程里需要更有效地去限制。

申明式的

函数式编程是申明式的编程方式,相对于命令式而言呢,命令式喜欢使用大量的指令或者可变对象,我们总是习惯于创建对象或者变量,并且修改它们的状态值,或者喜欢提供一系列的指令来要求程序执行。函数式编程你只需要提供你的要求,申明你的用意即可。

1
2
3
4
int arr[] = {123,215,532,2,14,152,3};
Arrays.sort(arr);
//申明式编程 区别于传统命令式编程
Arrays.stream(arr).forEach(System.out::println);

可以看出循环体消失了,对应的操作封装在程序库中。

不变对象

函数式编程中,几乎所有对象都不会被轻易的修改,有点类似不变模式。

1
2
3
4
//函数式编程特性之 不变对象 将所有数组元素+1并返回
Arrays.stream(arr).map((x)->x=x+1).forEach(System.out::println);
System.out.println("原数组值不会改变:");
Arrays.stream(arr).forEach(System.out::println);

利于并行

由于函数式编程不改变对象原有的值,那么我们就不用担心并发下会把这个对象”写坏”,由于对象是不变的 我们不需要进行任何同步操作,这样不仅有利于并行化同时在并行化后由于没有同步锁机制,其性能也会更好。

更少的代码

函数式编程更加简明扼要,类似于Clojure语言(一种运行于JVM的函数式编程语言)。例如:判断一个数组的奇数还是偶数,奇数+1 否则不操作输出数组:

1
2
3
System.out.println("操作3:");
//判断数组每一个元素 如果为偶数则不变 奇数+1
Arrays.stream(arr).map((x)->(x%2==0?x:x+1)).forEach(System.out::println);

只需要一行代码搞定。

函数式编程基础

FunctionalInterface 注释

Java8提供函数式接口的概念,简单来说就是只定义了单一抽象方法的接口:比如

1
2
3
4
5
6
7
8
9
10
11
12
/**
* FuntionalInterface 用于表明该接口为一个函数式接口,该接口定义只包含一个抽象方法handle()
* 如果一个函数满足函数式接口定义那么即使不标注编译器仍然会将其视为函数式接口,标注了
* 编译器就会进行检测,如不符合规范(例如有2个抽象方法)则编译报错。
* 这点类似于Override注解
* @author 圈圈
* @version 1.0
*/
@FunctionalInterface
public static interface IntHandler{
void handle(int i);
}

这里需要注意的是, 函数式接口需要满足接口只有一个抽象方法的规范,可以有多个默认方法 (Java8新增的接口默认方法以及被java.lang.Object 实现的方法)。
如下是一个完全符合规范的函数式接口:

1
2
3
4
5
@FunctionalInterface
public interface IntHadnler {
void handle(int i);
boolean equals(Object obj);
}

以下不是函数式接口 equals已经在Object实现:
1
2
3
4
@FunctionalInterface
public interface IntHadnler {
boolean equals(Object obj);
}

接口默认方法

Java8之前的版本,接口只能包含抽象方法。自java8之后 接口也可以包含若干个实例方法(不需要实现)。

1
2
3
4
5
6
7
8
@FunctionalInterface
public interface IntHadnler {
void handle(int i);
boolean equals(Object obj);
default void run(){
System.out.println("Running...");
}
}

如果接口继承中都有相同的default方法的话 调用的时候 编译器会抛出一个错误。所以需要指明。
SuperInterfaceName.super.run(); 这样调用
Java8中Comparator比较器引入了若干个默认方法,先按照字符串长度排序:在按照大小写不敏感的字母顺序排:

1
2
Conmparator<String> cmp = Comparator.comparingInt(String::length)
.thenComparing(String,CASE_INSENSITIVE_ORDER);

Lambda表达式

ambda表达式可以说是函数式编程的核心。lambda表达式极大地增强了Java语言的表达能力,lambada表达式即匿名函数,它是一段没有函数名的函数体,可以作为参数直接传递给相关的调用者。

1
2
3
4
5
6
7
//函数式 lambda表达式
//默认传入的变量都是final的
final int num =2 ;
//
Function<Integer, Integer> stringConvert = (from) -> from*num;
//输出4
System.out.println("lambda result ="+stringConvert.apply(2));

方法引用

方法引用是Java8提出来用于简化Lambda表达式的一种手段,他通过类名和方法名来定位到一个静态方法或者实例方法。
方法引用在Java8中使用也非常灵活,总的来说分为以下几种:

  • 静态方法引用 : ClassName::methodName
  • 实例上的实例方法引用 : InstanceName::methodName
  • 父类的实例方法引用 : super::methodName
  • 类型的实例方法引用 : ClassName::methodName
  • 构造方法引用 : Class::new
  • 数组构造方法引用 : TypeName[]::new

基本归类为 方法名或者实例名::方法名,构造函数则为 Class::new。

1
2
3
4
5
6
7
//lambda表达式方法引用
List<Student> list = new ArrayList<Student>();
for(int i=0;i<=10;i++){
list.add(new Student("张三"+i, i));
}
//lambda表达式调用实例方法
list.stream().map(Student::getName).forEach(System.out::println);

调用构造方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 一般来说如果使用静态方法或者调用明确,流内元素会自动作为调用目标
* 因此如果一个class存在同名的实例方法和构造方法的话编译器就会感到很困惑。
*/
//函数式调用构造函数
UserFactory<Student> usr = Student::new;
Student stuu = usr.create("123", 2);
System.out.println("调用 构造函数生成的对象:"+stuu.getName());


/**
* 定义函数式接口 Student::new构建对象会根据函数签名去寻找对应的构造方法
* @author 圈圈
* @version 1.0
* @param <U>
*/
@FunctionalInterface
interface UserFactory<U extends Student>{
U create(String name,int id);
}

一步一步走入函数式编程

就以遍历数组为例,如果使用Java8中流 可以这么做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* java8函数式编程
* @author 圈圈
* @version 1.0
*/
public class Java8FunctionHandler {
public static void main(String[] args) {
int arr[] = {12,42,125,1255,234,1};
/**
* 函数式编程 Arrays.stream(arr)返回一个流对象 类似于集合数组
* 它的foreach方法它接受一个IntConsumer接口 因为这个流是IntStream 也是装有Integer元素的流
* 它还接受DoubleStream LongStream等对象流 类型取决于它传入的参数
*/
System.out.println("函数式编程遍历:");
Arrays.stream(arr).forEach(new IntConsumer() {
@Override
public void accept(int value) {
// TODO Auto-generated method stub
System.out.print(value+"、");
}
});
//但是上述代码好像看起来还是很冗长 ,既然是value是推导出来的 那么我们就简化
System.out.println("\n简化遍历:");
/**
* 此时已经很清晰简洁了吧
* lambda表达式由“->”分割 左半部分表示参数 右半部分表示实现体(执行体)
* 因此我们可以简单理解lambda表达式只是匿名对象实现的一种新的方式
*/
Arrays.stream(arr).forEach((x)->System.out.print(x+"、"));
//还可以无须声明 继续简化 直接::方法引用静态方法输出
System.out.println("继续简化");
Arrays.stream(arr).forEach(System.out::println);
}
}

用lambda表达式不仅可以简化匿名类的编写,与接口默认方法相结合,顺畅的流式API对各种组件的自由装配。下面例子展示了两次输出,一次输出到标准错误,一次输出到标准输出。
使用流式API andthen():
1
2
3
4
5
//一次输出到标准错误,一次输出到标准输出。
IntConsumer out = System.out::println;
IntConsumer err = System.err::println;
//andThen将两者按照时间顺序分开先后
Arrays.stream(arr).forEach(out.andThen(err));

我们看看intConsumer源码:

1
2
3
4
default IntConsumer andThen(IntConsumer after) {
Objects.requireNonNull(after);
return (int t) -> { accept(t); after.accept(t); };
}

返回一个新的IntConsumer 先调用第一个 再调用第二个IntConsumer 从而实现多个处理器的集合 这种操作手法在函数式编程中极其常用。

并行流与并行排序

Java8中,可以在接口不变的情况下,将流改为并行流。这样可以很自然地使用多线程进行集合中的处理。

使用并行流过滤数据

这里展示一个求指定数据范围内质数的例子 首先我们需要一个判定质数的方法:

1
2
3
4
5
6
7
8
9
10
11
public static boolean isPrime(int num){
if(num<2){
return false;
}
for(int i =2;Math.sqrt(num)>=i;i++){
if(num%i==0){
return false;
}
}
return true;
}

函数式编程指定 range范围 去调用静态函数 filter流过滤出所有的质数 并 count累加:

1
2
long count = IntStream.range(2, 1000000).filter(ConcurrentStream::isPrime).count();
System.out.println("2-1000000的所有质数累加为:"+count);

并行程序,只需要将流并行化即可,parallel就让程序以多线程的形式去跑,应用于流内所有元素。
1
2
count=IntStream.range(1, 1000000).parallel().filter(ConcurrentStream::isPrime).count();
System.out.println("多线程的count:"+count);

从集合到并行流

在函数式编程中 我们可以从集合得到一个流或者并行流,下面这段代码试图统计集合内所有学生的平均分:

1
2
3
4
5
6
7
8
9
10
11
12
13
List<Student> list = new ArrayList<Student>();
for(int i=0;i<100;i++){
Student stu = new ConcurrentStream().new Student("Jack"+i,new Random().nextInt(100));
list.add(stu);
}
//统计总分
long count2 = list.stream().mapToLong((x)->x.scores).sum();
//非并行统计平均分
double avgScores = list.stream().mapToInt((x)->x.scores).average().getAsDouble();
System.out.println("100个学生的总分是:"+count2+"平均分是"+avgScores);
//并行执行
avgScores = list.parallelStream().mapToInt((x)->x.scores).average().getAsDouble();
System.out.println("100个学生的总分是:"+count2+" 并行平均分是"+avgScores);

并行排序

除了并行流以外,有串行的Arrays.sort(),Java8 有并行的 Arrays.parallelSort() 这类并行排序。除此之外 还提供了赋值接口
public static void setAll(int[] array,IntUanryOperator generator)
//并行对应 parallerSetAll(...)
这个接口是函数式意味很浓的接口,它的第二个参数是一个函数式接口,如果我们想给每个数组元素附上一个随机值,那么可以:

1
2
3
4
5
6
7
8
//Java8 数组排序  && Arrays.setAll赋值
int[] arr = new int[100];
Random r=new Random();
//串行赋值setAll 并行赋值parallelSetAll
Arrays.parallelSetAll(arr, (x)->r.nextInt());
System.out.println("并行排序后数组输出:");
Arrays.parallelSort(arr);
Arrays.stream(arr).forEach((x)->System.out.print(x+"、"));

增强的Future:CompletableFuture

Java8中新增了一个超大型工具类,为什么说它大呢,因为它不但实现了Future接口,还实现了CompletionStage接口,CompletionStage接口拥有多达40种方法,是的你没看错 这不符合面向对象设计原则的“单一职责原则(单方法接口)” 。但是它也就这么存在了,通过ComletionStation接口 我们可以在一个执行结果上进行多次的流式调用,以此可以得到最终结果。
completableFuture和Future一样,具有完成好了再通知的能力,作为函数调用契约,如果你向其请求一个数据如果数据还没有准备好请求线程就会等待,通过CompletableFuture我么可以手动设置CompletableFuture的完成状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CompletableFutureDemo implements Runnable{
CompletableFuture<Double> future = null;
public CompletableFutureDemo(CompletableFuture<Double> future) {
super();
this.future = future;
}



@Override
public void run() {
double result = 0;
try {
//如无数据会一直阻塞在这里
result = future.get() * future.get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//取到数据后阻塞解除
System.out.println("结果为: result="+result);
}


public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(100);
final CompletableFuture<Double> com = new CompletableFuture<Double>();
for(int i=0;i<50;i++){
pool.execute(new CompletableFutureDemo(com));
}
//启动50个线程后
//模拟主线程自己工作5秒
Thread.sleep(5000);
//赋值数据 50个线程解除阻塞 result = 20*20
com.complete(20.0);
}

CompletableFuture异步执行任务
通过CompletableFuture的封装,我们可以进一步地实现Future模式那样的异步调用。代码:

1
2
3
4
5
6
7
8
9
10
public static int call(int num) {
//模拟一个长时间的执行
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return num*num;
}

main方法 lambda表达式异步调用 supplyAsync():
1
2
final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->call(40));
System.out.println("异步调用完成返回 40^2="+future2.get());

查看这异步方法的类似方法有:

1
2
3
4
5
6
7
8
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor)

其中supplyAsync适用于有返回值的场景,而runAsync适用于无返回值的场景。
它们都有一个重载方法携带一个线程池参数,可以指定在哪个线程池中工作,如果不指定就在默认的系统公共的线程池ForkJoinPool.common()
注意:ForkJoinPool.commonPool()方法是Java8中新增的线程池方法,它获得一个公共的线程池方法,但是这些都是精灵线程,如果主线程退出他们也退出。
CompletionStage的流式调用
前文已经提到,CompletionStage的40个接口就是为函数式编程做准备的,在这里我们看看如何进行函数式API的调用(以刚才的方法为例):

1
2
3
4
5
6
7
//转变不同的类型 所以返回类型为void
final CompletableFuture<Void> future3 = CompletableFuture.supplyAsync(()->call(40))
.thenApply((i) -> Integer.toString(i))
.thenApply((str)->"66"+str+"66")
.thenAccept(System.out::println);
//如果不写get等待它执行完的话 主线程执行完上面代码就会立即退出(异步调用)
future3.get();

CompletableFuture中的异常处理

在CompletableFuture在执行过程中遇到异常,我们可以使用函数式编程风格优雅地进行处理,exceptionally():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//异常处理 如果不会发生异常 则往下走 如果发生异常 就进入异常处理流式代码块并返回
final CompletableFuture<Void> future4 = CompletableFuture.supplyAsync(()->excepDemo(20))
.exceptionally(ex -> {
//异常走这里 直接返回
System.out.println(ex.toString());
return 0;
}).thenApply((str)->Integer.toString(str))
.thenAccept(System.out::println);
future4.get();
}

//伪造个异常
public static Integer excepDemo(int x){
return x/0;
}

为此我们将看到如下输出:

java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero 0

CATALOG
  1. 1. Java8函数式编程简介
    1. 1.1. 函数作为一等公民
    2. 1.2. 无副作用
    3. 1.3. 申明式的
    4. 1.4. 不变对象
    5. 1.5. 利于并行
    6. 1.6. 更少的代码
  2. 2. 函数式编程基础
    1. 2.1. FunctionalInterface 注释
    2. 2.2. 接口默认方法
    3. 2.3. Lambda表达式
      1. 2.3.1. 方法引用
    4. 2.4. 一步一步走入函数式编程
    5. 2.5. 并行流与并行排序
      1. 2.5.1. 使用并行流过滤数据
    6. 2.6. 从集合到并行流
    7. 2.7. 并行排序
    8. 2.8. 增强的Future:CompletableFuture
      1. 2.8.1. CompletableFuture中的异常处理