본문 바로가기
개발

Cassandra Pagination Problem

by ahoy~ 2023. 10. 30.

Webflux + cassandra-driver-mapping을 사용하고 있는데 full gc가 발생하였습니다.

non-blocking시스템이였기에 NIO 스레드가 blocking되는 것을 문제로 추측하였고 당시 인프라 문제로 인해 덤프는 뜨지 못하였기 때문에 휴리스틱하게 문제를 추측하였습니다.

그러다 카산드라에서 데이터를 가져오는 쪽에 의심이 될만한 부분을 찾았습니다.

Problem Statement

문제가 되는 코드입니다.

카산드라 라이브러리에서 사용하는 ResultSetFuture를 CompletableFuture로 변환하는 메서드 입니다.

ResultSetFuture resultSetFuture = session.executeAsync(ps);
CompletableFuture<List<T>> future = new CompletableFuture<>();
resultSetFuture.addListener(() -> {
	if(resultSetFuture.isDone()){
		ResultSet resultSet = resultSetFuture.getUninterruptibly();
        Result<T> result = mapper.map(uninterruptibly);
        List<T> elements = result.all();
        future.complete(all);
    }
};

언뜻 볼때는 문제가 없어보이지만 문제는 .all() 부분입니다.

ResultSetFuture는 ListenableFuture<ResultSet>인터페이스를 확장하고있고 ResultSet은 PagingIterable인터페이스를 확장하고 있습니다. ResultSetFuture의 Listener가 호출되는 시점은 첫 페이지 요청이 완료되었을 때 입니다. 하지만 Result.all()을 호출하게 되면 현재 스레드를 blocking하면서 모든 페이지들을 요청하게 됩니다.

따라서 현재 수행중인 스레드(ex. webflux의 netty eventloop스레드...)가 block이 되면서 GC가 발생한 것 이였습니다.

Solution

솔루션 코드는 https://docs.datastax.com/en/developer/java-driver/3.6/manual/async/ 에서 가져왔습니다.

Statement statement = new SimpleStatement("select * from foo").setFetchSize(20);
ListenableFuture<ResultSet> future = Futures.transform(
    session.executeAsync(statement),
    iterate(1));

private static AsyncFunction<ResultSet, ResultSet> iterate(final int page) {
    return new AsyncFunction<ResultSet, ResultSet>() {
        @Override
        public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {

            // How far we can go without triggering the blocking fetch:
            int remainingInPage = rs.getAvailableWithoutFetching();

            System.out.printf("Starting page %d (%d rows)%n", page, remainingInPage);

            for (Row row : rs) {
                System.out.printf("[page %d - %d] row = %s%n", page, remainingInPage, row);
                if (--remainingInPage == 0)
                    break;
            }
            System.out.printf("Done page %d%n", page);

            boolean wasLastPage = rs.getExecutionInfo().getPagingState() == null;
            if (wasLastPage) {
                System.out.println("Done iterating");
                return Futures.immediateFuture(rs);
            } else {
                ListenableFuture<ResultSet> future = rs.fetchMoreResults();
                return Futures.transform(future, iterate(page + 1));
            }
        }
    };
}

가져온 ResultSet의 pagination기능을 사용하여 전체 fullyFetched되기 전까지 계속 페이지를 불러올 수 있도록 guava의 AsyncFunction과 Futures.transform을 사용하여 ResultSet을 ListenableFuture<ResultSet>으로 다시 바꿔줬습니다.

Listener(콜백) 재귀 로직

  1. 가져온 데이터 출력
  2. 마지막 페이지일 경우 즉시반환 future return
  3. 추가로 페이지가 있을 경우 Listener에 다음 페이지를 추가로 가져오는 ListenableFuture 전달

Lesson Learned

비동기 시스템을 사용할 때는 항상 blocking이 되지 않는지 확인하자.

댓글