본문 바로가기

BackEnd/Linux

Linux / network - 리눅스기초를 활용한 데이터 통신 9(멀티 쓰레드, Mutex, cond_wait, string회전)

안녕하세요 인포돈 입니다.

본 내용은 우분투를 기본으로 작성되었습니다.
Cloud Computing을 활용하여 서버를 구축하였습니다.

스레드를 활용한 데이터 통신 (클라이언트 3, 서버 1)

 본 포스팅에서는 이전에 프로세스들 간에 데이터를 통신하는 방법에 대해서 다뤄왔다. 그러나 현실적으로는 프로세스를 통해서 다루기보다는 한 프로세스에 여러 개의 스레드를 활용하는 방법이 더욱 대중적이다. 따라서 본 포스팅에서는 스레드를 활용하여 데이터를 통신하는 방법에 대해서 다뤄본다. (본격적인 내용에 앞서서 간단한 이론들을 살펴본다)

 

 - 스레드란

 스레드를 학습해본 사람은 스레드는 프로세스 내에서 실제로 일을 하는 주체를 의미한다고 알고 있다. 그러나 처음 접해보는 사람은 이해하기가 힘들 수 있다. 따라서 쓰레드를 좀 더 쉽게 이해해본다면, 우리가 게임을 하나 틀었다고 가정하자 그렇다면 우리 컴퓨터는 1개의 프로세스를 작동한 것이다. 이때 해당 게임에는 하나의 프로세스가 전담하여 프로그램을 구동하게 되는데 이때 게임에서는 거래도 해야 되며, 몬스터도 잡아야 되며, 채팅도 해야 하며, 그래픽 또한 띄어주어야 한다. 하나의 게임에도 한 프로세스가 감당하기 힘든 많은 기능들을 수행해야 한다. 이를 좀 더 비동기적으로 실행하기 위해서 (비동기적이란 동시에 일을 하는 것, 동기식이란 순차적으로 처리하는 것) 스레드에 각 할 일을 주는 것이다. 이때 각 스레드는 각자 채팅, 거래, 사냥 등의 임무를 맡아서 소비자가 원하는 기능을 실질적으로 수행하게 된다.

 

 - Producer와 Consumer

하나의 일정 패턴이라고 생각하는 게 편하다. 이는 멀티 스레드 프로그램에서 자주 사용되는 패턴으로 생성자는 클라이언트에게 데이터를 받아오는 역할을 소비자는 받은 데이터를 처리하는 일을 하게 된다. 이렇게 나누는 이유는 각 스레드가 감당할 수 있는 부하를 조절할 수 있기 때문입니다. 아래 그림에서 보면 Thread1이 생산자, thread2가 소비자가 될 수 있지요

 - 프로그램 흐름도

이번에 구현한 프로그램의 흐름도를 생각하면 아래와 같다.

1) 각 클라이언트에게 string과 int내용을 입력받는다.

2) 해당 내용을 서버로 전송한다.

3) 서버에서 하나의 클라이언트에 2개의 스레드(생산자, 소비자)를 할당한다.

4) 생산자 스레드에서 클라이언트의 입력 내용을 받아서 string은 회전, int는 +1을 해준다.

5) 소비자 스레드에서 클라이언트에게 변환된 값을 돌려보내 준다.


서버 코드

#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/ipc.h>
#include <pthread.h>

#define MAXBUF 1024

pthread_mutex_t mutex;
pthread_cond_t cond;
char send_msg[MAXBUF];

int cnt = 0;

void * producer_thread(void *data){
        int sockfd = *((int *)data);
        int readn;

        socklen_t addrlen;
        char buf[MAXBUF];
        struct sockaddr_in client_addr;
        memset(buf, 0x00, MAXBUF);
        addrlen = sizeof(client_addr);
        getpeername(sockfd, (struct sockaddr *)&client_addr, &addrlen);

        //데이터 읽어오기
        readn = read(sockfd, buf, MAXBUF);
        buf[readn] = '\0';

        char temp_str[100];
        int temp_num;

        char *temp = strtok(buf,"/");
        strcat(temp_str, temp);

        for(int i = 0 ; temp_str[i] != 0 ; i++){
                if(temp_str[i] == 0){
                        temp_str[i] = 0;
                        break;
                }
        }

        temp = strtok(NULL,"/");
        temp_num = atoi(temp);

        printf("Read Data Client[%d] : %s(%d) : %d and %s \n",cnt, inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port),temp_num,temp_str);

        int flag = cnt;
        cnt += 1;
        //읽어온 데이터 나누기
        while(1){
                pthread_mutex_lock(&mutex);
                pthread_cond_signal(&cond);
                //str데이터 회전하기
                char imsi=temp_str[strlen(temp_str)-1];
                for(int i = strlen(temp_str)-1 ; i >= 0 ; i--){
                        temp_str[i] = temp_str[i-1];
                }
                temp_str[0] = imsi;

                //num 데이터 증가하기
                temp_num += 1;

                //클라이언트에게 보낼 메시지
                char t[1000];
                strcat(send_msg,temp_str);
                strcat(send_msg,"/");
                sprintf(t,"%d",temp_num);
                strcat(send_msg,t);
                pthread_mutex_unlock(&mutex);
                sleep(1);
        }
}
void * consumer_thread(void *data){

        int sockfd = *((int *)data);
        int readn;
        socklen_t addrlen;
        char buf[MAXBUF];
        struct sockaddr_in client_addr;
        memset(buf, 0x00, MAXBUF);
        getpeername(sockfd, (struct sockaddr *)&client_addr, &addrlen);
        int flag = cnt;
        while(1){
                //printf("123123 \n");
                pthread_mutex_lock(&mutex);
                pthread_cond_wait(&cond, &mutex);
                write(sockfd,send_msg,MAXBUF);
                //sleep(1);
                send_msg[0] = '\0';
                pthread_mutex_unlock(&mutex);
        }
}
int main(int argc, char **argv)
{
        int server_sockfd, client_sockfd;
        int client_temp[3];
        int client_len, n, cnt = 0;
        char buf[MAXBUF];
        char msg[MAXBUF];
        struct sockaddr_in clientaddr, serveraddr;
        client_len = sizeof(clientaddr);
        pthread_t thread_id, thread_id1;

        if ((server_sockfd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP )) == -1)
        {
                perror("socket error : ");
                exit(0);
        }
        memset(&serveraddr, 0x00, sizeof(serveraddr));
        serveraddr.sin_family = AF_INET;
        serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
        serveraddr.sin_port = htons(atoi(argv[1]));

        bind (server_sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
        listen(server_sockfd, 5);

        memset(msg, 0x00, MAXBUF);

        while(1)
        {
                client_sockfd = accept(server_sockfd, (struct sockaddr *)&clientaddr, &client_len);

                if(client_sockfd == -1){
                        printf("accept error \n");
                }else{
                        printf("before server cnt : %d \n",cnt);
                        pthread_create(&thread_id, NULL, producer_thread, (void *)&client_sockfd);
                        pthread_create(&thread_id1, NULL, consumer_thread, (void *)&client_sockfd);
                        printf("after server cnt : %d \n",cnt);
                        pthread_detach(thread_id);
                        pthread_detach(thread_id1);
                        cnt += 1;
                }//else end
                }
        close(server_sockfd);
        return 0;
}

해당 서버 코드가 굉장히 길어 보이지만 나누어서 바라본다면, 사실 크게 어려울 게 없는 코드이다. 물론 socket에 관한 기본적인 생성과정은 생략한다. (이에 대한 내용은 앞선 포스팅을 참고) 

 

Linux / network - 리눅스기초를 활용한 데이터 통신 5(서버, 클라이언트, 소켓통신, 데이터 합치기)

안녕하세요 인포돈입니다. 이번 포스팅부터 본격적인 리눅스 소켓 통신 코딩을 적어보겠습니다. 본 포스터에서는 기본적으로 널리 알려져 있는 기본 코드를 활용하여 클라이언트의 데이터를

infodon.tistory.com

 

 

 - main 코드 (프로세스)

while(1)
{
    client_sockfd = accept(server_sockfd, (struct sockaddr *)&clientaddr, &client_len);
    
    if(client_sockfd == -1){
        printf("accept error \n");
    }else{
        printf("before server cnt : %d \n",cnt);
        pthread_create(&thread_id, NULL, producer_thread, (void *)&client_sockfd);
        pthread_create(&thread_id1, NULL, consumer_thread, (void *)&client_sockfd);
        printf("after server cnt : %d \n",cnt);
        pthread_detach(thread_id);
        pthread_detach(thread_id1);
        cnt += 1;
    }//else end
}

메인 프로세스에서는 처음 클라이언트의 입력이 들어오면 해당 소켓 정보를 활용해서 pthread_create를 활용해서 해당 소켓의 정보를 각 소비자, 클라이언트에게 넘겨줍니다. 이때 pthread_detach의 경우 스레드를 분리시키는 역할로 일반적으로 스레드를 생성하여 사용하면 스레드가 종료되더라도 모든 자원들이 해제가 되지 않는다. 이때 detach는 스레드가 종료할 경우 모든 자원을 해제하게 해주는 메서드이다.

 

 - 생산자

void * producer_thread(void *data){
        int sockfd = *((int *)data);
        int readn;

        socklen_t addrlen;
        char buf[MAXBUF];
        struct sockaddr_in client_addr;
        memset(buf, 0x00, MAXBUF);
        addrlen = sizeof(client_addr);
        getpeername(sockfd, (struct sockaddr *)&client_addr, &addrlen);

        //데이터 읽어오기
        readn = read(sockfd, buf, MAXBUF);
        buf[readn] = '\0';

        char temp_str[100];
        int temp_num;

        char *temp = strtok(buf,"/");
        strcat(temp_str, temp);

        for(int i = 0 ; temp_str[i] != 0 ; i++){
                if(temp_str[i] == 0){
                        temp_str[i] = 0;
                        break;
                }
        }

        temp = strtok(NULL,"/");
        temp_num = atoi(temp);
        int flag = cnt;
        cnt += 1;
        
        //읽어온 데이터 나누기
        while(1){
                pthread_mutex_lock(&mutex);
                pthread_cond_signal(&cond);
                //str데이터 회전하기
                char imsi=temp_str[strlen(temp_str)-1];
                for(int i = strlen(temp_str)-1 ; i >= 0 ; i--){
                        temp_str[i] = temp_str[i-1];
                }
                temp_str[0] = imsi;

                //num 데이터 증가하기
                temp_num += 1;

                //클라이언트에게 보낼 메시지
                char t[1000];
                strcat(send_msg,temp_str);
                strcat(send_msg,"/");
                sprintf(t,"%d",temp_num);
                strcat(send_msg,t);
                pthread_mutex_unlock(&mutex);
                sleep(1);
        }
}

코드가 길어 보이지만 해당 코드는 크게 3가지로 나눌 수 있습니다. 1) 기본적으로 들어온 소켓의 정보를 변수에 할당해서 담아줍니다. 2) 소켓에 데이터 읽어올 준비가 끝났으면, 해당 값을 가져옵니다. (이때 들어오는 값은 '/'구분자로 인해서 string과 int를 구분했기 때문에 이에 대한 처리를 해줍니다.) 3) 이제 읽어온 값들을 string은 회전 int는 + 1을 해줍니다. 이후 다시 이 메시지들을 '/' 구분자를 활용해 합쳐줍니다.)

 

이 부분들은 모두 주석처리로 해당 위치를 표현해 줬습니다. (코드가 지저분한 건... ㅜㅜ)

 

 - 소비자

void * consumer_thread(void *data){

        int sockfd = *((int *)data);
        int readn;
        socklen_t addrlen;
        char buf[MAXBUF];
        struct sockaddr_in client_addr;
        memset(buf, 0x00, MAXBUF);
        getpeername(sockfd, (struct sockaddr *)&client_addr, &addrlen);
        int flag = cnt;
        while(1){
                //printf("123123 \n");
                pthread_mutex_lock(&mutex);
                pthread_cond_wait(&cond, &mutex);
                write(sockfd,send_msg,MAXBUF);
                //sleep(1);
                send_msg[0] = '\0';
                pthread_mutex_unlock(&mutex);
        }
}

소비자는 사실 더욱 간단합니다. 해당 메시지를 그냥 클라이언트에게 다시 돌려보내면 끝!

 

****

이때 항상 생각해야 되는 점들이 있습니다. 바로 pthread_mutex_lock과 pthread_cond_wait입니다.  공유하는 데이터에 읽고 쓰기게 동시에 이루어지게 된다면, condition race상태가 발생할 수 있습니다. (이에 대한 상황은 이전 포스트 확인) 이러한 상황을 방지하기 위해서 사용됩니다. 대표적으로 세마포도 예 로들 수 있습니다.

 

Linux / network - 리눅스기초를 활용한 데이터 통신 8(sharedmemory, semaphore)

안녕하세요 인포돈 입니다. 본 내용은 우분투를 기본으로 작성되었습니다. Cloud Computing을 활용하여 서버를 구축하였습니다. 3개의 클라이언트 공유 메모리를 활용한 데이터 변환 본 포스팅의 목

infodon.tistory.com

 또한, 앞서서 다루었던 프로세스와는 다르게 전역 변수로 선언한 send_msg를 활용하여 더욱 쉽게 데이터를 공유할 수 있다는 장점이 있습니다!


클라이언트 코드

#include <sys/socket.h>  /* 소켓 관련 함수 */
#include <arpa/inet.h>   /* 소켓 지원을 위한 각종 함수 */
#include <sys/stat.h>
#include <stdio.h>      /* 표준 입출력 관련 */
#include <string.h>     /* 문자열 관련 */
#include <unistd.h>     /* 각종 시스템 함수 */
#include <stdlib.h>

#define MAXLINE    1024

int main(int argc, char **argv)
{
    struct sockaddr_in serveraddr;
    int server_sockfd;
    int client_len;
    char buf[MAXLINE];
    char msg[MAXLINE];
    int n;

    if ((server_sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("error :");
        return 1;
    }

    /* 연결요청할 서버의 주소와 포트번호 프로토콜등을 지정한다. */
    server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = inet_addr("127.0.0.1");
    serveraddr.sin_port = htons(3600);

    client_len = sizeof(serveraddr);

    /* 서버에 연결을 시도한다. */
    if (connect(server_sockfd, (struct sockaddr *)&serveraddr, client_len)  == -1)
    {
        perror("connect error :");
        return 1;
    }
    memset(msg, 0x00, MAXLINE);
    memset(buf, 0x00, MAXLINE);
    printf("input string : ");
    scanf("%s", buf);    /* 키보드 입력을 기다린다. */

    for(int i = 0 ; buf[i] != 0 ; i ++){
      if(buf[i] == '\n'){
       buf[i] = 0;
       break;
      }
    }

    strcat(msg,buf);
    strcat(msg,"/");
    //printf("string msg : %s\n", msg);

    memset(buf, 0x00, MAXLINE);
    printf("\ninput int : ");
    scanf("%s", buf);     //키보드 입력을 기다린다.

    for(int i = 0 ; buf[i] != 0 ; i++){
      if(buf[i] == '\n'){
       buf[i] = 0;
       break;
      }
    }
    //printf("int buf : %s\n", buf);
    strcat(msg,buf);
    //printf("int msg : %s\n", msg);

    if (write(server_sockfd, msg, MAXLINE) <= 0) /* 입력 받은 데이터를 서버로 전송한다. */
    {
        perror("write error : ");
        return 1;
    }
    memset(buf, 0x00, MAXLINE);
    /* 서버로 부터 데이터를 읽는다. */
   while(1){
    if (n = read(server_sockfd, buf, MAXLINE) <= 0)
    {
        perror("read error : ");
        return 1;
    };
    for(int i = 0 ; buf[i] != 0 ; i ++){
     if(buf[i]==0){
      buf[i] = 0;
      break;
     }
    }
    char str[1000];
    int in;
    char* ptr = strtok(buf,"/");
    strcpy(str,ptr);
    for(int i = 0 ; str[i] != 0 ; i++){
     if(str[i]==0){
      str[i]=0;
      break;
     }
    }
    ptr = strtok(NULL,"/");
    in = atoi(ptr);

    printf("read : %s and %d \n",str, in);
    str[0] = '\n';
    memset(buf,0x00,MAXLINE);
    }

    close(server_sockfd);
    return 0;
}

흠.... 클라이언트의 코드는 딱히 어려울 게 없습니다. 기본적인 소켓통신 세팅을 해준 뒤 사용자에게 string과 int의 값읍 입력받아 해당 값을 '/' 구별자로 서버측으로 보내 준뒤 다시 받아서 이를 출력해준다면 끝!


실행화면

서버 실행화면
클라이언트 실행화면