본문 바로가기

BackEnd/Linux

Linux / network - 리눅스기초를 활용한 데이터 통신 8(sharedmemory, semaphore)

안녕하세요 인포돈 입니다.

본 내용은 우분투를 기본으로 작성되었습니다.
Cloud Computing을 활용하여 서버를 구축하였습니다.

 

3개의 클라이언트 공유 메모리를 활용한 데이터 변환

본 포스팅의 목적은 3개의 클라이언트에서 받은 값들을 rotation 즉, 초당 회전을 시켜서 계속해서 클라이언트에 보내주는 프로그램을 작성해 보려 합니다. 이해가 어려우신 분들을 위해서 이번에는 실행화면을 먼저 띄어 드리겠습니다.

서버 실행화면
클라이언트 실행화면

 보시는 실행화면과 같이 앞의 string은 한 칸씩 옆으로 이동하고 뒤의 int는 1씩 증가하는 프로그램입니다. 본 프로그램은 pipe를 사용하기보다는 shared memory를 활용해서 사용하는 방법에 대해서 다루어 보겠습니다.


 - Shared Memory란?

 한 프로세스에 해당하는 메모리는 해당 프로세스만이 사용하는 게 일반적이다. 그러나 타 프로세스에서 해당 프로세스의 데이터를 사용하고 싶을 때 사용하는게 바로 공유 메모리입니다. 특정 메모리를 공유하면 데이터를 더 빠르게 접근할 수 있기 때문에 프로그램을 더 효율적으로 만들 수 있습니다.

 

 공유 메모리는 개인적으로 아래의 형식으로 사용됩니다.

int shmdt(const void *shmaddr);
// shmat에서 전달 받은 포인터를 전달하면된다. 성공시 0 실패시 -1 반환

int shmctl(int shmid, int cmd, struct shmid_ds *buf)
// shmid : 공유메모리의 id를 의미합니다.
// cmd : 제어할 일종의 컨트롤러인데, 주어진 컨트롤러를 용도에 맞게 입력하시면 됩니다.
// buf : 구조체로 정의 되어 있는데 shared memory에서 정의된 구조체 입니다.

정확한 파라미터의 의미나, 구조체의 의미를 이해하고 싶다면 아래 링크를 참고해 주세요(저도 참고했었습니다.)
https://reakwon.tistory.com/96

 

[리눅스] 공유메모리 (Shared Memory) 개념과 예제(shmget, shmat, shmdt, shmctl)

공유메모리(Shared Memory) 프로세스에서 메모리는 해당 프로세스만이 사용하는게 일반적입니다. 메모리에는 명령어, 지역 변수, 동적 변수, 전역 변수와 같이 데이터가 존재하는데 그 프로세스만

reakwon.tistory.com

 -  Semaphore

 갑자기 세마포? 이게 왜 나오는지 궁금하실 수 있습니다. 앞서서 저희는 공유 메모리가 여러 프로세스가 접근 가능한 메모리 영역이라고 배웠습니다. 그런데 여러 프로세스가 공유 메모리의 데이터를 수정한다면, 아래와 같은 오류가 생길 수 있습니다.

 

 프로세스 A가 공유 메모리의 X의 데이터를 수정하려고 한다. 프로세스 A에는 X = X + 1을 하려고 한다.

 프로세스 B는 공유 메모리의 X의 데이터를 수정하려고 한다. 프로세스 B에는 X = X + 2를 하려고 한다.

 

그런데 이때 2개의 프로세스가 동시에 공유 메모리에 접근하여 데이터를 가져가게 된다면, 우리는 11 또는 12의 값이 공유 메모리에 저장될 것이다.

 

그렇다면 우리가 얻으려는 13의 값을 얻을 수 없게 됩니다.

 

 이러한 모순을 없애기 위해 제약사항을 걸어줘야 하는데 바로 다른 프로세스가 공유메모리에 접근하여 데이터를 수정하고 있다면, 다른 프로세스가 공유 메모리에 일시적으로 접근하지 못하도록 제어하는 게 바로 세마포입니다.

(물론 세마포 외에다 다른 방법도 다수 존재합니다.)

 

int semget(key_t key, int nsems, int semflg);
// key 세마포어를 식별하는 키
// nsems : 세마포어 자원의 갯수를 의미
// semflg : 세마포어의 동작 옵션이다. (IPC_CREAT, IPC_EXCL 두개 존재)
//  호출 성공시 SEMID라는 세마포어 식별자를 반환한다.

int semctl(int semid, int semnum, int cmd, ...)
// semid : 세마포어의 식별자로 앞선 semget에서부터 나온 id
// semnum : 세마포어에서 사용되는 인덱스이다.
// cmd : 세마포어를 제어할 수 있는 command이다.
// union semun : 추가 제어 커맨드 입니다.

서버 코드

#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/ipc.h>

#define MAXBUF 1024

union semun{
        int val;
};

int main(int argc, char **argv)
{
        int server_sockfd, client_sockfd;
        int client_len, n, cnt = 0;
        char buf[MAXBUF];
        char msg[MAXBUF];
        struct sockaddr_in clientaddr, serveraddr;
        client_len = sizeof(clientaddr);
        int pid;
        int pid_s;

        if ((server_sockfd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP )) == -1)
        {
                perror("socket error : ");
                exit(0);
        }
        memset(&serveraddr, 0x00, sizeof(serveraddr));
        serveraddr.sin_family = AF_INET;
        serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
        serveraddr.sin_port = htons(atoi(argv[1]));

        bind (server_sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
        listen(server_sockfd, 5);

        memset(msg, 0x00, MAXBUF);

        while(1)
        {
                client_sockfd = accept(server_sockfd, (struct sockaddr *)&clientaddr, &client_len);
                printf("New Client[%d] Connect: %s port : %d\n",cnt, inet_ntoa(clientaddr.sin_addr),ntohs(clientaddr.sin_port));

                pid = fork();

                if(pid  == 0){
                        printf("자식 시작 %d \n",cnt);
                        close(server_sockfd);
                        memset(buf, 0x00, MAXBUF);

                        //client read
                        n = read(client_sockfd, buf, MAXBUF);
                        buf[n]= '\0';
                        char str[100];
                        int in;

                        char *temp = strtok(buf,"/");
                        strcat(str,temp);

                        for(int i = 0 ; str[i] != 0 ; i ++){
                                if(str[i]==0){
                                        str[i] = 0;
                                        break;
                                }
                        }
                        temp = strtok(NULL,"/");
                        in = atoi(temp);

                        printf("Read Data Client[%d] : %s(%d) : %d and %s \n",cnt,inet_ntoa(clientaddr.sin_addr),
                        ntohs(clientaddr.sin_port),in,str);
                        //shared memory
                        int shmid[3], semid[3];
                        int *cal_num[3];
                        void *shared_memory[3];
                        shared_memory[cnt] = NULL;

                        union semun sem_union[3];
                        struct sembuf semopen[3] = {{0,-1,SEM_UNDO},{0,-1,SEM_UNDO},{0,-1,SEM_UNDO}};
                        struct sembuf semclose[3] = {{0,1,SEM_UNDO},{0,1,SEM_UNDO},{0,1,SEM_UNDO}};

                        semid[cnt] = semget((key_t)3477+cnt,1,IPC_CREAT|0666);
                        shmid[cnt] = shmget((key_t)1234+cnt, sizeof(int), 0666|IPC_CREAT);
                        shared_memory[cnt] = shmat(shmid[cnt],NULL,0);
                        cal_num[cnt] = (int*)shared_memory[cnt];

                        pid_s = fork();

                        if(pid_s == 0){ // consumer
                                while(1){
                                        semop(semid[cnt],&semopen[cnt],1);	//cs begin
                                        sleep(1);
                                        in = *cal_num[cnt];
                                        char t[1000];
                                        strcat(t,str);
                                        strcat(t,"/");

                                        char tt[1000];
                                        sprintf(tt,"%d",in);
                                        strcat(t,tt);

                                        //send msg to client
                                        write(client_sockfd,t,sizeof(t));
                                        t[0] = '\0';

                                        //string rotation
                                        char imsi=str[strlen(str)-1];
                                        for(int j = strlen(str)-1 ; j >= 0 ; j--){
                                                str[j] = str[j-1];
                                        }
                                        str[0]=imsi;

                                        semop(semid[cnt],&semclose[cnt],1);	//cs end
                                }
                        }else if(pid_s > 0){ //producer

                                sem_union[cnt].val = in;
                                semctl(semid[cnt], 0, SETVAL,sem_union[cnt]);
                                *cal_num[cnt] = in;
                                while(1){
                                        semop(semid[cnt],&semopen[cnt],1);	//cs begin
                                        sleep(1);
                                        *cal_num[cnt] = *cal_num[cnt] + 1;
                                        semop(semid[cnt],&semclose[cnt],1);	//cs end
                                }
                        }

                        close(client_sockfd);
                        return 0;
                }
                else if(pid > 0 ){
                        close(client_sockfd);
                        cnt++;
                }
        }
        close(server_sockfd);
        return 0;
}

 

서버 코드가 굉장히 긴데 일반적인 socket관련 사항은 생략하고 중요한 shared memory에 대해서 집중적으로 고려해 보겠습니다. (socket 관련 설명은 이전 포스팅을 참고해주세요)

 

내용에 들어가기에 앞서서 간략히 프로그램 흐름을 고려하고 가야 합니다.

* 서버를 오픈한다.

* 각 클라이언트를 오픈한 후 서버에 접속한다.

* 서버에서는 fork를 통해 자식 프로세스를 만들어 클라이언트의 입력을 기다린다.

* 부모 서버는 다시 클라이언트의 접속을 기다린다.

* 각 클라이언트는 stringint 입력을 넣어준다.

* 넣어준 입력은 “/” 토큰으로 구별하도록 하고 하나의 msg에 담아 서버에 보내준다.

* 각 자식 프로세스는 “/”을 활용하여 stringint입력을 구분한다.

* 각 자식 프로세스는 string은 오른쪽으로 회전, int+1의 값을 설정해 주어 다시 클라이언트에게 보낸다.

* 각 클라이언트는 자식 프로세스에서 받은 내역을 계속 출력한다.

 

 - 자식 프로세스 클라이언트에 받은 입력 전처리

//client read
n = read(client_sockfd, buf, MAXBUF);
buf[n]= '\0';
char str[100];
int in;

char *temp = strtok(buf,"/");
strcat(str,temp);

for(int i = 0 ; str[i] != 0 ; i ++){
    if(str[i]==0){
         str[i] = 0;
         break;
    }}
temp = strtok(NULL,"/");
in = atoi(temp);

해당 코드는 사실 크게 중요한 코드는 아닙니다. 클라이언트에 입력받은 string과 int를 '/'를 구분자로 하여 나누어서 저장해 주는 코드입니다. 중간 for반복문은 해당 문장의 크기를 알 수 없기에 입력받은 값의 끝을 알려주기 위해서 사용했습니다. (사용하지 않으면 쓰레기 값이 같이 출력되어 알아볼 수 없는 글씨가 나옵니다.)

 

 - 공유 메모리, 세마포 세팅

//shared memory
int shmid[3], semid[3];
int *cal_num[3];
void *shared_memory[3];
shared_memory[cnt] = NULL;

union semun sem_union[3];
struct sembuf semopen[3] = {{0,-1,SEM_UNDO},{0,-1,SEM_UNDO},{0,-1,SEM_UNDO}};
struct sembuf semclose[3] = {{0,1,SEM_UNDO},{0,1,SEM_UNDO},{0,1,SEM_UNDO}};

semid[cnt] = semget((key_t)3477+cnt,1,IPC_CREAT|0666);
shmid[cnt] = shmget((key_t)1234+cnt, sizeof(int), 0666|IPC_CREAT);
shared_memory[cnt] = shmat(shmid[cnt],NULL,0);
cal_num[cnt] = (int*)shared_memory[cnt];

 크게 어려울 것 없습니다. 우리는 3개의 클라이언트를 각 공유 메모리를 활용해서 해당 값을 공유해야 되니 3개의 공유 메모리를 만들어주고 그에 맞게 3개의 세마포를 생성해 줍니다.

 

 - comsumer

 pid_s = fork();

if(pid_s == 0){ // consumer
    while(1){
        semop(semid[cnt],&semopen[cnt],1);	//cs begin
        sleep(1);
        in = *cal_num[cnt];
        char t[1000];
        strcat(t,str);
        strcat(t,"/");

        char tt[1000];
        sprintf(tt,"%d",in);
        strcat(t,tt);

        //send msg to client
        write(client_sockfd,t,sizeof(t));
        t[0] = '\0';

        //string rotation
        char imsi=str[strlen(str)-1];
        for(int j = strlen(str)-1 ; j >= 0 ; j--){
            str[j] = str[j-1];
        }
        str[0]=imsi;

        semop(semid[cnt],&semclose[cnt],1);	//cs end
}}

semop는 앞서 말한 consumer가 해당 공유 메모리에 접근하려 할 때 다른 프로세스가 접근하지 못하도록 합니다. 이후 int 값과 회전한 string 값을 더해서 클라이언트에 보내 줍니다. 물론 클라이언트에 보내줄 때도 '/' 구분자를 활용해 보내줍니다.

 

 (cs란 critical session이라는 뜻으로 공유하는 데이터를 접근할 때 다른 프로세스나 스레드가 접근하지 못하도록 하는 코드 구역을 의미합니다.)

 

 - producer

else if(pid_s > 0){ //producer
    sem_union[cnt].val = in;
    semctl(semid[cnt], 0, SETVAL,sem_union[cnt]);
    *cal_num[cnt] = in;
    while(1){
        semop(semid[cnt],&semopen[cnt],1);	//cs begin
        sleep(1);
        *cal_num[cnt] = *cal_num[cnt] + 1;
        semop(semid[cnt],&semclose[cnt],1);	//cs end
}}

producer에서는 별거 없습니다. 앞선 int값을 + 1만 해주어 다시 공유 메모리에 올려주는 역할을 담당합니다.


클라이언트 코드

#include <sys/socket.h>  /* 소켓 관련 함수 */
#include <arpa/inet.h>   /* 소켓 지원을 위한 각종 함수 */
#include <sys/stat.h>
#include <stdio.h>      /* 표준 입출력 관련 */
#include <string.h>     /* 문자열 관련 */
#include <unistd.h>     /* 각종 시스템 함수 */
#include <stdlib.h>

#define MAXLINE    1024

int main(int argc, char **argv)
{
    struct sockaddr_in serveraddr;
    int server_sockfd;
    int client_len;
    char buf[MAXLINE];
    char msg[MAXLINE];
    int n;

    if ((server_sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("error :");
        return 1;
    }

    /* 연결요청할 서버의 주소와 포트번호 프로토콜등을 지정한다. */
    server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = inet_addr("127.0.0.1");
    serveraddr.sin_port = htons(3600);

    client_len = sizeof(serveraddr);

    /* 서버에 연결을 시도한다. */
    if (connect(server_sockfd, (struct sockaddr *)&serveraddr, client_len)  == -             1)
    {
        perror("connect error :");
        return 1;
    }
    memset(msg, 0x00, MAXLINE);
    memset(buf, 0x00, MAXLINE);
    printf("input string : ");
    scanf("%s", buf);    /* 키보드 입력을 기다린다. */
    //printf("string buf : %s\n", buf);

    for(int i = 0 ; buf[i] != 0 ; i ++){		//문자열의 마지막을 표시하기 위함
      if(buf[i] == '\n'){
       buf[i] = 0;
       break;
      }
    }

    strcat(msg,buf);
    strcat(msg,"/");

    memset(buf, 0x00, MAXLINE);
    printf("\ninput int : ");
    scanf("%s", buf);     //키보드 입력을 기다린다.

    for(int i = 0 ; buf[i] != 0 ; i++){	//문자열의 마지막을 표시하기 위함
      if(buf[i] == '\n'){
       buf[i] = 0;
       break;
      }
    }
    strcat(msg,buf);

    if (write(server_sockfd, msg, MAXLINE) <= 0) /* 입력 받은 데이터를 서버로 전             송한다. */
    {
        perror("write error : ");
        return 1;
    }
    memset(buf, 0x00, MAXLINE);
    /* 서버로 부터 데이터를 읽는다. */
   while(1){
    if (n = read(server_sockfd, buf, MAXLINE) <= 0)
    {
        perror("read error : ");
        return 1;
    }
    for(int i = 0 ;  buf[i] != 0 ; i ++){	//문자열의 마지막을 표시하기 위함
     if(buf[i]==0){
      buf[i] = 0;
      break;
     }
    }
    char str[1000];
    int in;
    char *ptr = strtok(buf,"/");		//토큰 분리
    strcpy(str,ptr);
    for(int i = 0 ; str[i] != 0 ; i++){	//문자열의 마지막을 표시하기 위함
     if(str[i]==0){
      str[i]=0;
      break;
     }
    }
    ptr = strtok(NULL,"/");
    in = atoi(ptr);

    printf("read : %s and %d \n", str, in);
    str[0] = '\n';			//str 중복을 없애기 위해서
    memset(buf,0x00,MAXLINE);		//buf 중복을 없애기 위해서
    }

    close(server_sockfd);
    return 0;
}

사실 뭐 클라이언트는 크게 다를 게 없습니다. 단 '/' 구분자를 활용해 서버에 데이터를 넘겨주고 받을 때 또한 출력을 할 때 '/'를 고려하여 출력하면 됩니다.


 사실 이번 과제 해결 과정에서 가장 중요하게 생각했던 점이 바로 공유 메모리를 활용한 프로세스 간의 데이터 통신이었다. 그러나 문제를 해결하고 보니 int 데이터만 공유 데이터를 활용했다는 것을 알 수 있었다. 이러한 간단한 주고받기는 기본적인 내용이었지만, 보다 과제에 충실하게 구현하기 위해서는 *cal_num처럼 클라이언트에게 받은 string 데이터를 공유 메모리에 저장하는 형식을 고려했어야 했다. 이를 위해서 간략히 방식을 고쳐보면 문자열 데이터를 포인터 형식으로 선언하여 입력받은 데이터를 해당 공유메모리에 저장하고 producercs begin 부분을 시작할 때 string 데이터를 회전시키는 코드를 넣어주면 된다.