Language/Java

Java - Concurrent Programming (Runnable, Executor, Callable, Future 등)

JaeHoney 2022. 4. 3. 19:45

자바 Concurrent 프로그래밍

Concurrent S/W란 동시에 여러 작업을 할 수 있는 소프트웨어를 말합니다. 예를 들어, 크롬에서 무언가를 다운로드할 때 PC가 멈춰있는 게 아니라, 다른 작업을 병행할 수 있습니다.

 

자바에서는 멀티 프로세싱과 멀티쓰레드를 지원합니다. 우리가 잘 아는 스프링도 자바의 멀티쓰레드를 이용해서  많은 사용자들의 요청을 동시다발적으로 처리합니다.

 

Thread

Thread를 상속하면 새로운 Thread 클래스를 새롭게 정의할 수 있습니다. 

public class Main {

    public static void main(String[] args) {
        HelloThread helloThread = new HelloThread();
        helloThread.start();
    }

    static class HelloThread extends  Thread {
        @Override
        public void run() {
            System.out.println("Hello");
        }
    }
    
}

 

쓰레드를 여러 개 정의할 수도 있지만, 순서는 보장되지 않습니다. 아래의 코드의 경우에 helloThread2가 먼저 실행될 수도 있습니다.

HelloThread helloThread1 = new HelloThread();
helloThread1.start();
HelloThread helloThread2 = new HelloThread();
helloThread2.start();

많은 쓰레드(multi-thread)를 사용하게되면 쓰레드가 서로 값을 변경하지 않도록 주의해야 합니다.

 

Runnable

새로운 Thread 클래스를 정의하지 않고도 Runnable의 익명 객체를 구현해서 Thread를 사용할 수 있습니다.

Thread thread = new Thread(new Runnable() {
    @Override
    public void run() {
    	System.out.println("Hello");
    }
});
thread.start();

람다식을 사용하면 코드를 간추릴 수 있습니다.

Thread thread = new Thread(() -> System.out.println("hello"));
thread.start();

 

Excutor

ExcutorService는 Thread를 생성하고 처리하고 제거하는 등 병렬 프로그래밍을 위한 인터페이스입니다. ThreadPool을 구현하고 관리하는 역할을 합니다. submit()을 사용하면 작업을 ExecutorService가 만든 쓰레드풀에서 처리합니다. shutdown()은 쓰레드 풀을 종료합니다.

public class Main {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for(int i=0; i<10; i++) {
            executorService.submit(printMyThreadName());
        }
        executorService.shutdown();
    }

    private  static Runnable printMyThreadName() {
        return () -> System.out.println(Thread.currentThread().getName());
    }

}

3개의 쓰레드를 가진 쓰레드풀을 생성해서 printMyThreadName()이라는 작업을 10번을 처리했습니다.


Callable

Runnable은 정해진 일을 수행하는 작업밖에는 하지 못했습니다. 반환 값이 void타입이기 때문입니다.

CCallable은 Future로 매핑된 반환값을 가집니다. 그래서 메인쓰레드에서 반환 받은 Future타입 객체의 get함수로 서브쓰레드의 결과를 기다렸다가 반환받을 수 있습니다.

public class Main {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> "Hello!";
        Future<String> future = executorService.submit(hello);
        
        try{
            System.out.println(future.get()); // Hello!
        } catch (Exception e) {
            throw e;
        }
        executorService.shutdown();
    }

}

Future의 isDone() 메서드로 쓰레드의 작업이 모두 실행되었는지 여부를 체크할 수도 있습니다.

future.isDone() // true OR false

cancel() 메서드를 사용하면 해당 쓰레드의 작업을 취소할 수도 있습니다!

future.cancel(true);

 

executorService.invokeAll()을 사용하면 Collections을 넘길 수 있습니다.

 

그러면 Thread pool에 있는 Callable 동작들을 각 Thread가 처리하고 모두 끝나면 결과를 List로 받을 수도 있습니다.

List<Future<String>> futures = executorService.invokeAll(callableList);

 

CompletableFuture

CompleteableFuture는 자바에서 비동기(Asynchronous) 프로그래밍을 가능하게 하기 위한 인터페이스입니다.

 

CompletableFuture의 장점은 아래와 같은 문제가 있었습니다.

  • Future를 외부에서 완료 시키거나 취소할 수 있다.
  • 작업이 끝났을 때 다른 작업을 실행하도록 처리할 수 있다. 
  • 여러 Future를 조합할 수 있다. (ex. 유저의 게시글을 가져오고 해당 게시물의 댓글 목록 가져오기)
  • 예외 처리용  API를 제공한다.

 

CompletableFuture를 Runnable처럼 return값 없이 사용할 때는 runAsync()를 사용합니다.

CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
    System.out.println("Hello"); // "Hello"
});

반면, return값이 있게 사용하려면 supplyAsync를 사용하면 됩니다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Hello!";
});
System.out.println(future.get()); // Hello!

 

future.complete()를 사용하면 반환 값을 지정해주고, Callable을 바로 완료처리할 수 있습니다.

CompletableFuture<String> future = new CompletableFuture<>();
future.complete("default");

System.out.println(future.get()); // "default"

여러 Future를 조합해서 사용할 수도 있습니다. thenApply / thenAcceps / thenRun 등

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "hello world";
}).thenApply((s) -> {
    return s + "!";
});

 

여기서 아까 학습했던 Executor를 사용해서, 해당 쓰레드에서 사용하게 하려면 runAsync()나 supplyAsync()의 두 번째 인자로 executorService를 주면 해당 쓰레드풀내에서 작업을 할당해서 처리합니다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(4);
    
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        System.out.println(Thread.currentThread().getName());
    }, executorService).thenAccept((s) -> {
        System.out.println(Thread.currentThread().getName());
    });

    future.get();
    executorService.shutdown();
}

 

A쓰레드의 콜백작업을 다른 쓰레드에서 처리를 하고 싶다면, thenAccept() -> thenAcceptAsync()를 사용하고 두 번째 인자로 executorService를 주면 됩니다.

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println(Thread.currentThread().getName());
}, executorService).thenAcceptAsync((s) -> {
    System.out.println(Thread.currentThread().getName());
}, executorService);

작업A의 콜백 작업B를 다른 쓰레드에서 처리합니다. 다른 executorService를 인자로 주면 다른 쓰레드풀에서 처리할 수도 있는데, 해당 과정은 생략하겠습니다.

앞서 말씀드렸듯이 CompletableFuture를 함수형 프로그래밍처럼 조립해서 사용할 수도 있습니다.

 

조합 - CompletableFuture

 

thenCompose()을 사용하면 CompletableFuture를 여러개 정의해서 

completableFutureA.thenCompose(completableFutureB);

 

thenCombine()을 사용하면 CompletableFuture를 결합할 수 있습니다.

CompletableFuture<Integer> total = future1.thenCombine(future2, (cf1, cf2) -> cf1 + cf2);

 

allOf()를 사용하면 인자로 준 작업들을 전부 포함해서 하나의 CompletableFutre로 만들 수 있습니다. 이 경우, 모든 completableFuture가 완료되어야 값을 반환합니다.

CompletableFuture<List<String>> result = CompletableFuture.allOf(futures)
        .thenApply(v -> futures.stream()
                    .map(CompletableFuture::get)
                    .collect(Collectors.toList()));

 

anyOf()를 사용해서 인자로 넘긴 작업 중 한가지라도 완료되면 그 작업의 결과를 반환할 수도 있습니다.

CompletableFuture.anyOf(futures)
        .thenAccept(System.out::println);

 

예외처리 - CompletableFuture

CompletableFuture는 예외 처리를 위한 API를 제공합니다. exceptionally() 메서드를 사용하면 예외를 catch해서 처리할 수 있습니다.

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    if (!isAvilable) {
        throw new IllegalArgumentException();
    }
    System.out.println("Job Complete!");
}).exceptionally(e -> {
    System.out.println("Error!");
    return null;
});

 

handle() 메서드를 사용하면 예외 발생 유무나, 발생한 예외 타입에 따른 처리를 구현할 수도 있습니다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (!isAvilable) {
        throw new IllegalArgumentException();
    }
    return "hello"
}).handle((result, error) -> {
    if(e != null) {
        System.out.println(e);
        return "Error!";
    }
    return result;
});

 

 


 

 

여기 까지 길고 두서없었지만, 관심있게 봐주셨다면 감사합니다.

 

Reference : 더 자바, Java 8 - 인프런 백기선님 강의
 

더 자바, Java 8 - 인프런 | 강의

자바 8에 추가된 기능들은 자바가 제공하는 API는 물론이고 스프링 같은 제 3의 라이브러리 및 프레임워크에서도 널리 사용되고 있습니다. 이 시대의 자바 개발자라면 반드시 알아야 합니다. 이

www.inflearn.com