ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Java] NIO 기반 입출력 및 네트워킹 - TCP 넌블로킹 채널
    CSE/Java 2015. 9. 6. 13:53
    NIO 기반 입출력 및 네트워킹은 여러 절로 구성되어 있습니다.







    7. TCP 넌블로킹 채널

     ServerSocketChannel, SocketChannel은 블로킹(Blocking) 방식도 지원하지만 넌블로킹(non-blocking) 방식도 지원합니다.


     이번 절엘서는 넌블로킹 방식의 특징과 넌블로킹의 핵심 객체인 셀렉터(Selector)를 이해하고 채널을 넌블로킹 방식으로 사용하는 방법에 대해 알아보도록 하겠습니다.



     



     7.1 넌블로킹 방식의 특징

      블로킹 방식은 언제 클라이언트가 연결 요청을 할지 모르기 때문에 accept() 에서 블로킹됩니다. 그리고 언제 클라이언트가 데이터를 보낼지 모르므로 read() 메소드는 항상 데이터를 받을 준비를 하기 위해 블로킹 됩니다. 


      그렇기 때문에 ServerSocketChannel과 연결된 SocketChannel 당 하나의 스레드가 할당되어야 합니다. 따라서 연결된 클라이언트가 많을수록 스레드의 수가 증가하고 서버에 심각한 성능 문제를 유발할 수도 있습니다. 이 문제를 해결하기 위해 지금까지는 스레드풀(ExecutorService)을 사용했었습니다.


      자바는 블로킹 방식의 또 다른 해결책으로 넌블로킹 방식을 지원하고 있습니다. 넌블로킹 방식은 connect(), accept(), read(), write() 메소드에서 블로킹이 없습니다. 클라이언트의 연결 요청이 없으면 accept()는 즉시 null을 리턴합니다. 그리고 클라이언트가 데이터를 보내지 않으면 read()는 즉시 0을 리턴하고, 파라미터로 전달한 ByteBuffer에는 어떤 데이터도 저장되지 않습니다. 넌블로킹 방식에서 다음 코드는 클라이언트가 연결 요청을 하지 않으면 무한 루프를 돌게 됩니다.



    1
    2
    3
    4
    5
    while (true) {
        SocketChannel socketChannel = serverSocketChannel.accept();
        ...
    }
     
    cs



      accept() 메소드가 블로킹되지 않고 바로 리턴되기 때문에 클라이언트가 연결 요청을 보내기 전까지 while 블록 내의 코드가 쉴새 없이 실행되어 CPU가 과도하게 소비되는 문제점이 발생합니다. 그래서 넌블로킹은 이벤트 리스너 역할을 하는 셀렉터(Selector)를 사용합니다. 넌블로킹 채널에 Selector를 등록해 놓으면 클라이언트의 연결 요청이 들어오거나 데이터가 도착할 경우, 채널은 Selector에 통보합니다. Selector는 통보한 채널들을 선택해서 작업 스레드가 accept() 또는 read() 메소드를 실행해서 즉시 작업을 처리하도록 합니다.


      Selector는 멀티 채널의 작업을 싱글 스레드에서 처리할 수 있도록 해주는 멀티플렉서(multiplexor) 역할을 합니다. 채널은 Selector에 자신을 등록할 때 작업 유형을 키(SelectionKey)로 생성하고, Selector의 관심키셋(interest-set)에 저장시킵니다. 클라이언트가 처리 요청을 하면 Selector는 관심키셋에 등록된 키 중에서 작업 처리 준비가 된 키를 선택된 키셋(selected-set)에 별도로 저장합니다. 그리고 작업 스레드가 선택된 키셋에 있는 키를 하나씩 꺼내어 키와 연관된 채널 작업을 처리하게 됩니다. 작업 스레드가 선택된 키셋에 있는 모든 키를 처리하게 되면 선택된 키셋은 비워지고, Selector는 다시 관심키셋에서 작업 처리 준비가 된 키들을 선택해서 선택된 키셋을 채웁니다.









      넌블로킹에서 작업 스레드를 꼭 하나만 사용할 필요는 없습니다. 채널 작업 처리 시 스레드풀을 사용할 수 있습니다. 작업 스레드가 블로킹되지 않기 때문에 적은 수의 스레드로 많은 양의 작업을 고속으로 처리할 수 있어 블로킹 방식보다는 서버의 성능이 향상될 수 있습니다.





     7.2 셀렉터 생성과 등록

      Selector는 정적 메소드인 open() 메소드를 호출하여 생성합니다. open() 메소드는 IOException이 발생할 수 있기 때문에 예외 처리가 필요합니다.



    1
    2
    3
    4
    5
    try {
        SocketChannel socketChannel = serverSocketChannel.accept();
        ...
    catch (IOException e) { }
     
    cs



     

      Selector에 등록할 수 있는 채널은 SelectableChannel의 하위 채널만 가능한데, TCP 통신에 사용되는 ServerSocketChannel, SocketChannel과 UDP 통신에 사용되는 DatagramChannel은 모두 selectableChannel의 하위 클래스이므로 Selector에 등록할 수 있습니다. 주의할 점은 넌블로킹으로 설정된 것만 가능합니다. 다음은 ServerSocketChannel을 넌블로킹으로 설정하는 코드입니다.



    1
    2
    3
    4
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
     
    serverSocketChannel.configureBlocking(false);
     
    cs

      



      다음은 SocketChannel을 넌블로킹으로 설정하는 코드입니다.



    1
    2
    3
    SocketChannel socketChannel = SocketChannel.open();
     
    socketChannel.configureBlocking(false);
    cs




      각 채널은 register() 메소드를 이용해서 Selector에 등록하는데, 첫 번째 파라미터는 Selector 이고 두 번째 파라미터는 채널의 작업 유형입니다.



    1
    2
    3
    4
    SelectionKey selectionKey = serverSocketChannel.register(Selector sel, int ops);
     
    SelectionKey selectionKey = socketChannel.register(Selector sel, int ops);
     
    cs




      다음은 두 번째 파라미터로 사용할 수 있는 작업 유형 별 SelectionKey의 상수들입니다.




     SelectionKey의 상수

     설명 

      OP_ACCEPT 

     ServerSocketChannel의 연결 수락 작업 

      OP_CONNECT 

     SocketChannel의 서버 연결 작업 

      OP_READ 

     SocketChannel의 데이터 읽기 작업 

      OP_WRITE 

     SocketChannel의 데이터 쓰기 작업 



      register()는 채널과 작업 유형 정보를 담고 있는 SelectionKey를 생성하고 Selector의 관심키셋에 저장한 후 해당 SelectionKey를 리턴합니다. 다음은 ServerSocketChannel이 Selector에 자신의 작업 유형을 등록하는 코드입니다.



    1
    2
    SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
     
    cs




      ServerSocketChannel은 클라이언트 연결 수락 작업을 하므로 작업 유형은 OP_ACCEPT로 지정합니다. 다음은 SocketChannel이 Selector에 자신의 작업 유형을 등록하는 코드입니다.



    1
    2
    3
    4
    5
    SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_CONNECT);
    SelectionKey selectionKey = socketChannel.register(SelectionKey.OP_READ);
    SelectionKey selectionKey = socketChannel.register(SelectionKey.OP_WRITE);
     
     
    cs




      SocketChannel의 작업은 세 가지인데, 서버 연결 요청 작업은 OP_CONNECT, 읽기 작업은 OP_READ, 쓰기 작업은 OP_WRITE로 지정합니다. 주의할 점은 동일한 SocketChannel로 두 가지 이상의 작업 유형을 등록할 수 없습니다. 즉, register()를 두 번 이상 호출할 수 없습니다. 등록은 한 번만 하되, 작업 유형이 변경되면 이미 생성된 SelectionKey를 수정해야 합니다.


      register()가 리턴한 SelectionKey는 작업 유형 변경, 첨부 객체 저장, 채널 등록 취소 등을 할 때 사용됩니다. SelectionKey를 별도로 관리할 필요는 없는데, 채널이 Selector를 등록하면 채널의 keyFor() 메소드로 SelectionKey를 언제든지 얻을 수 있기 때문입니다.


    1
    2
    SelectionKey key = socketChannel.keyFor(selector);
     
    cs








     7.3 선택된 키셋

      Selector를 구동하려면 select() 메소드를 호출해야 합니다. select() 는 관심키셋에 저장된 SelectionKey로부터 작업 처리 준비가 되었다는 통보가 올 때까지 대기(블로킹)합니다. 최소한 하나의 SelectionKey로부터 작업 처리 준비가 되었다는 통보가 오면 리턴합니다. 이때 리턴 값은 통보를 해온 SelectionKey의 수입니다. 다음은 세 가지 종류의 select() 메소드를 설명한 표입니다.




     리턴 타입

     메소드명(파라미터) 

     설명 

     int 

     select() 

     최소한 하나의 채널이 작업 처리 준비가 될 때까지 블로킹된다. 

     int 

     select(long timeout) 

     select()와 동일한데, 주어진 시간(밀리세컨드) 동안만 블로킹된다. 

     int 

     selectNow() 

     호출 즉시 리턴된다. 작업 처리 준비된 채널이 있으면 채널 수를 리턴하고, 없다면 0을 리턴한다. 




      주로 첫 번째 select()를 사용하는데, 이 메소드는 블로킹되므로 UI 및 이벤트를 처리하는 스레드에서 호출되면 안 되고, 별도의 작업 스레드를 생성해서 실행해야 합니다. select() 가 리턴하는 경우는 다음 세 가지 입니다.


      - 채널이 작업 처리 준비가 되었다는 통보를 할 때

      - Selector의 wakeup() 메소드를 호출할 때

      -select()를 호출한 스레드가 인터럽트될 때











      ServerSocketChannel은 작업 유형이 OP_ACCEPT 하나 밖에 없지만, SocketChannel은 상황에 따라서 읽기 작업과 쓰기 작업을 번갈아가며 작업 유형을 변경할 수 있습니다. 채널의 작업 유형이 변경되면 SelectionKey의 작업 유형을 interestOps(0 메소드로 변경해야 작업 스레드가 올바르게 채널 작업을 할 수 있습니다. 다음은 SelectionKey의 작업 유형을 OP_WRITE로 변경하는 코드입니다.



    1
    2
    3
    selectionKey.interestOps(SelectionKey.OP_WRITE);
    selector.wakeup();
     
    cs




      SelectionKey의 작업 유형이 변경되면 Selector의 wakeup() 메소드를 호출해서 블로킹되어 있는 select()를 즉시 리턴하고, 변경된 작업 유형을 감지하도록 select()를 다시 실행해야 합니다. 다음은 select() 메소드가 1 이상의 값을 리턴할 경우, selectedKeys() 메소드로 작업 처리 준비된 SelectionKey들을 Set 컬렉션으로 얻습니다. 이것이 선택된 키셋입니다.




    1
    2
    3
    4
    5
    6
    int keyCount = selector.select();
     
    if (keyCount > 0) {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
    }
     
    cs





      



     7.4 채널 작업 처리

      작업 스레드가 해야 할 일은 선택된 키셋에서 SelectionKey를 하나씩 꺼내어 작업 유형별로 채널 작업을 처리하는 것입니다. SelectionKey가 어떤 작업 유형인지 알아내는 방법은 SelectionKEy의 다음 메소드 중에서 어떤 것이 true를 리턴하는지 조사하면 됩니다.



     리턴 타입

     메소드명(파라미터) 

     설명 

     boolean 

     isAcceptable() 

     작업 유형이 OP_ACCEPT인 경우 

     boolean 

     isConnectable() 

     작업 유형이 OP_CONNECT인 경우 

     boolean 

     isReadable() 

     작업 유형이 OP_READ인 경우 

     boolean 

     isWritable() 

     작업 유형이 OP_WRITE인 경우 





      다음은 작업 스레드가 작업 유형별로 채널 작업을 처리하는 방법을 보여줍니다.



    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
     
     
        Thread thread = new Thread() {
            @Override
            public void run() {
                while (true) {
                    try {
                        int keyCount = Selector.select(); // 작업 처리 준비된 키 감지
                        
                        if (keyCount == 0) {
                            continue;
                        }
                        
                        Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 선택된 키셋 얻기
                        Iterator<SelectionKey> iterator = selectedKeys.iterator();
                        
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            
                            if (selectionKey.isAcceptable()) { /* 연결 수락 작업 처리 */ }
                            else if (selectionKey.isReadable()) { /* 읽기 작업 처리 */}
                            else if (selectionKey.isWritable()) { /* 쓰기 작업 처리 */ }
                            
                            iterator.remove();
                        }
                    } catch(Exception e) {
                        break;
                    }
                }
            }
        };
     
        thread.start();
    cs




      작업 스레드가 채널 작업을 처리하려면 채널 객체가 필요한데, SelectionKey의 channel() 메소드를 호출하면 얻을 수 있습니다. 다음은 작업 유형이 OP_ACCEPT일 경우, 연결 수락 작업 처리에서 필요한 ServerSocketChannel을 얻는 코드입니다.



    1
    2
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKEy.channel();
     
    cs




      작업 스레드가 채널 작업을 처리하다 보면 채널 객체 이외에 다른 객체가 필요할 수도 있습니다. 이러한 객체는 SelectionKey에 첨부해 두고 사용할 수 있습니다. SelectionKey의 attach() 메소드는 객체를 첨부하고, attachment() 메소드는 첨부된 객체를 얻을 때 사용합니다. 다음은 Client 객체를 SelectionKey에 첨부하고 얻어내는 코드입니다.



    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
     
     
        // 객체 첨부하기
        Client client = new Client(socketChannel);
        SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
        selectionKey.attach(client);
        
        // 첨부된 객체 얻기
        if (selectionKey.isReadable()) {
            Client client = (Client) selectionKey.attachment();
        }
        
    cs








     7.5 채팅 서버 구현

      TCP 블로킹에서 살펴봤던 채팅 서버를 넌블로킹으로 구현해보면서 셀렉터(Selector)와 넌블로킹 ServerSocketChannel 그리고 넌블로킹 SocketChannel이 어떻게 사용되는지 이해해봅시다.




       서버 클래스 구조


     다음은 서버 클래스의 구조를 보여줍니다.



      * NServerExam.java


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    package pathexam;
     
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.util.List;
    import java.util.Vector;
     
    import javafx.application.Application;
    import pathexam.ServerExam.Client;
     
    public class NServerExam extends Application {
     
        Selector selector;
        ServerSocketChannel serverSocketChannel;
        List<Client> connections = new Vector<Client>();
     
        void startServer() {
     
        }
     
        void stopServer() {
     
        }
     
        void accept(SelectionKey selectionKey) {
     
        }
     
        class Client {
     
        }
     
        //////////////////////
        // UI 생성 코드
     
        public static void main(String[] args) {
     
        }
     
    }
     
     
     
    cs




       


       startServer() 메소드


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
     
    void startServer() {
        try {
     
            selector = Selector.open(); // Selector 생성
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false); // 넌블로킹으로 설정
            serverSocketChannel.bind(new InetSocketAddress(5001));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
     
        } catch (Exception e) {
            if (serverSocketChannel.isOpen()) {
                stopServer();
            }
     
            return;
        }
    }
    cs




      다음은 작업 처리 준비가 된 SelectionKey를 선택하고, 작업 유형별로 작업 스레드가 처리하는 코드입니다.



    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
        void startServer() {
            try {
     
                selector = Selector.open(); // Selector 생성
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false); // 넌블로킹으로 설정
                serverSocketChannel.bind(new InetSocketAddress(5001));
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
     
            } catch (Exception e) {
                if (serverSocketChannel.isOpen()) {
                    stopServer();
                }
     
                return;
            }
            
            Thread thread = new Thread() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            int keyCount = selector.select();
                            
                            if (keyCount == 0) {
                                continue;
                            }
                            
                            Set<SelectionKey> selectedKeys = selector.selectedKeys();
                            Iterator<SelectionKey> iterator = selectedKeys.iterator();
                            
                            while (iterator.hasNext()) {
                                SelectionKey selectionKey = iterator.next();
                                
                                if (selectionKey.isAcceptable()) {
                                    accept(selectionKey);
                                } else if (selectionKey.isReadable()) {
                                    Client client = (Client) selectionKey.attachment();
                                    client.receive(selectionKey);
                                } else if (selectionKey.isWritable()) {
                                    Client client = (Client) selectionKey.attachment();
                                    client.send(selectionKey);
                                }
                                
                                iterator.remove();
                            }
                            
                        } catch (Exception e) {
                            if (serverSocketChannel.isOpen()) {
                                stopServer();
                            }
                            break;
                        }
                    }
                }
            };
            
            thread.start();
            
            Platform.runLater(()->
                displayText("[서버 시작]")
                btnStartStop.setText("Stop");
            });
        }
    cs







       stopServer() 메소드



    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    void stopServer() {
        try {
            Iterator<Client> iterator = connections.iterator();
            
            while (iterator.hasNext()) {
                Client client = iterator.next();
                client.socketChannel.close();
                iterator.remove();
            }
            
            if (serverSocketChannel != null && serverSocketChannel.isOpen()) {
                serverSocketChannel.close();
            }
            
            if (selector != null && selector.isOpen()) {
                selector.close();
            }
            
            Platform.runLater(() -> {
                displayText("[서버 멈춤]");
                btnStartStop.setText("start");
            });
        } catch (Exception e) {
            
        }
    }
    cs






       accept(SelectionKey selectionKey) 메소드



    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    void accept(SelectionKey selectionKey) {
        try {
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            SocketChannel socketChannel = serverSocketChannel.accept();
            
            String msg = "[연결 수락: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]";
            Platform.runLater(() -> displayText(msg));
            
            Client client = new Client(socketChannel);
            connections.add(client);
            
            Platform.runLater(() -> displayText("[연결 개수: " + connections.size() + "]"));
            
        } catch (Exception e) {
            if (serverSocketChannel.isOpen()) {
                stopServer();
            }
        }
    }
    cs







       Client 클래스


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    class Client {
        SocketChannel socketChannel;
        String sendData;
     
        Client(SocketChannel socketChannel) throws IOException {
            this.socketChannel = socketChannel;
     
            socketChannel.configureBlocking(false);
            SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
     
            selectionKey.attach(this);
        }
     
        void receive(SelectionKey selectionKey) {
            try {
                ByteBuffer byteBuffer = ByteBuffer.allocate(100);
     
                int byteCount = socketChannel.read(byteBuffer);
     
                if (byteCount == -1) {
                    throw new IOException();
                }
     
                String msg = "[요청 처리: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName()
                        + "]";
     
                Platform.runLater(() -> displayText(msg));
     
                byteBuffer.flip();
                Charset charset = Charset.forName("UTF-8");
                String data = charset.decode(byteBuffer).toString();
     
                for (Client client : connections) {
                    client.sendData = data;
                    SelectionKey key = client.socketChannel.keyFor(selector);
                    key.interestOps(SelectionKey.OP_WRITE);
                }
                selector.wakeup();
     
            } catch (Exception e) {
                try {
                    connections.remove(this);
                    String msg = "[클라이언트 통신 안됨: " + socketChannel.getRemoteAddress() + ": "
                            + Thread.currentThread().getName() + "]";
     
                    Platform.runLater(() -> displayText(msg));
     
                    socketChannel.close();
                } catch (IOException e2) {
     
                }
            }
        }
     
        void send(SelectionKey selectionKey) {
            try {
                Charset charset = Charset.forName("UTF-8");
                ByteBuffer byteBuffer = charset.encode(sendData);
     
                socketChannel.write(byteBuffer);
                selectionKey.interestOps(SelectionKey.OP_READ);
                selector.wakeup();
            } catch (Exception e) {
                try {
                    String msg = "[클라이언트 통신 안됨: " + socketChannel.getRemoteAddress() + ": "
                            + Thread.currentThread().getName() + "]";
     
                    Platform.runLater(() -> displayText(msg));
                    connections.remove(this);
                    socketChannel.close();
                } catch (IOException e2) {
     
                }
            }
        }
    }
    cs






       UI 생성 코드


      UI 생성 코드는 TCP 블로킹 채널의 채팅 서버와 동일하므로 생략하겠습니다.




     7.6 채팅 클라이언트 구현

      채팅 클라이언트는 이전 블로킹 채널의 채팅 클라이언트를 그대로 사용하면 됩니다.

      











    * 이 포스트은 서적 '이것이 자바다' 를 참고하여 작성한 포스트입니다.

    댓글

Designed by Tistory.