シリアル通信で受信完了をできるだけ早く検知する方法 [Raspberry pi]

最近、ラズパイでシリアル通信をするプログラムを書いてる。いくつか詰まったところがあったので備忘録的に残しておこうと思う。

作成したプログラムは以下のリンクにある GitHub - bamchoh/rpi-serialport-test

TERMIOS 構造体

Linux で シリアル通信をするとなると tty 経由でやることになる。tty の設定は TERMIOS 構造体/関数群 でできるわけだが、ASCII ベースの通信の場合に良しなに動作させるための設定となるため、バイナリ通信の場合は、それら設定が邪悪に働く場合がある。例えば、TERMIOS 構造体の c_iflag の一つ IGNCR は、設定すると CR を無視するようになる。つまり ホストから 0x0d を送信しても ラズパイでは受信されない。また、c_oflagにも特定のコードを変換するフラグがある。なので、バイナリ通信をする場合 c_iflag と c_oflag は 0 を設定しておくのが無難だと思っている。今のところ 0 を設定していて困ったことはない。

カノニカルモード/非カノニカルモード

詳しい説明は TERMIOS の MANページ にあるので、そちらを読んでもらうとして、ざっくりとした説明としては、「端末は行単位で処理することが多いので、行入力が終わった段階で処理しよう。というのがカノニカルモード(だと思う。)」 ただ、私がやりたいのはバイナリ通信なので、今回は非カノニカルモードを選択する。非カノニカルモードを選択すると、read(2) の動作が c_cc[VMIN] と c_cc[VTIME] の設定によって、以下のように変わってくる。

  1. MIN が 0, TIME も 0 の場合
    データが利用可能であれば、 read(2) はすぐに返る。このときの返り値は、そのとき利用可能なバイト数か read(2) の要求バイト数のうち小さい方となる。 利用可能なデータがない場合 read(2) は 0 を返す。
  2. MIN が 1 以上, TIME が 0 の場合
    read(2) は、利用可能なデータが MIN バイトに達するまで停止する。返り値は最大でも要求バイト数である。
  3. MIN が 0, TIME が 1 以上
    TIME はタイマーの上限を規定し、単位は 1/10 秒である。 タイマーは read(2) が呼ばれた時点で開始される。 read(2) が返るのは、少なくとも 1バイトのデータが利用可能となった時点、 またはタイマーが時間切れとなった時点である。 入力が全くなくタイマーが時間切れとなった場合、 read(2) は 0 を返す。 read(2) の呼び出し時にすでに利用可能なデータがある場合、 その呼び出しは呼び出し直後にそのデータが到着したかのように振る舞う。
  4. MIN が 1 以上, TIME も 1 以上
    TIME はタイマーの上限を規定し、単位は 1/10 秒である。 入力の最初のバイトが利用可能になった後は、 新たに 1バイト受信する度にタイマーがリセットされる。 read(2) は以下の条件のいずれかを満たした場合に返る。

    1) MIN バイトのデータを受信した。
    2) バイト間タイマーが時間切れになった。
    3) read(2) で要求されたバイト数のデータを受信した (POSIX ではこの終了条件は規定されておらず、 他のいくつかの実装では read(2) はこの条件では返らない)。

    タイマーは最初のバイトが利用可能になった時点で開始されるので、 少なくとも 1 バイトは読み出される。 read() の呼び出し時にすでに利用可能なデータがある場合、 その呼び出しは呼び出し直後にそのデータが到着したかのように振る舞う。

今回は、データを受信しきったら速やかに次の処理に移行する必要があったため、read でブロックされることは避けたかった。とはいえ、ブロックしない 1 の場合、自前でポーリングループを作成する必要があり、CPUのパワーも使ってしまうので、それも避けたかった。折衷案として、3が候補になるが、タイムアウトが最短でも100msというのは遅すぎた。

受信完了

さて、先ほど「データを受信しきったら」と言ったが、何をもって受信しきったといえるだろうか?ファイルであれば、EOF(-1)ならデータを読み切ったといえるだろうし、ASCII通信であればCR(13)/LF(10)が来れば終わりといえるかもしれない。つまり、扱うデータによってその定義は変化する。今回はバイナリ通信なので、CR/LFやEOFをデータの終わりとすることはできない。

なのでバイト間タイムアウトというものを使う。これは何かというと、最後のバイトを受信してから一定期間データを受信しなかったら、タイムアウトを発生させるというもの。連続して受信している時点ではタイムアウトは発生しないので、タイムアウトが発生したらデータを受信しきった。と判断できるといえる。

ただ、先ほども言っていた通り、TERMIOSで設定するVTIMEは最小で100msとシリアル通信としては比較的遅いタイムアウトになってしまうため、別のタイムアウトの機構が必要になってくる。

ハードウェア受信割り込み

そこで登場するのがハードウェア割り込みだ。シリアル通信チップから受信時の割り込みを発生させ、CPUがそれを捕捉する。今使っているラズパイはRaspberry pi 2 Model B なので シリアル通信は CPU上に組み込まれている PL011 UART が行っている。PL011 にはいくつかの割り込みがあるが、今回注目すべきなのはその中の2つ、受信割り込みと受信タイムアウト割り込みだ。

受信割り込み

受信割り込みは受信データの量があらかじめ決められた量(ウォーターレベル)を超えた時に発生する。ウォーターレベル、UARTが持っているFIFOのサイズ に対して割合で指定する。Raspberry Pi では PL011のFIFO サイズを16バイトに設定してあり、その半分のデータが受信されれば、受信割り込みが発生する。つまりは8バイト受信するごとに割り込みが発生する。

受信タイムアウト割り込み

受信タイムアウト割り込みは32bitの間、何もデータが受信されなかった場合に発生する。ただし、元からFIFOにデータがない場合は発生しない。

データ完了待ち戦略

ということは、受信データが8バイトであれば次のデータがあるかもしれないので、受信待ちを継続し、8以外であれば受信終了とすればいい。

...

本当にそうだろうか?例えば、データが8バイト丁度の時は結局、TERMIOSのバイト間タイムアウトに頼るしかなくなってしまう。この案は失敗だった。

バイト間タイマーの単位変更

次に思いついたのが、バイト間タイマー(VTIME)の単位を100msからもっと小さい値に変更することだった。ただ、全ての端末に関係することと、カーネルの修正が必要だったことからこの方法は実験していない。

自前でタイマーを持つ

バイト間タイマーが遅いので、自前でタイマーを持つ戦略をとる。select や poll をタイマ付きで使うことで read にタイムアウトをつける。ただ、何も受信していないときにタイムアウト付きで待つと必ずタイムうアウトになり、かつ、タイムアウトの周期を1msとかにしてしまうと、1msごとにread → タイムアウトのループが発生して結局CPUパワーを使ってしまうので、ハードウェア受信割り込みと併用して、8バイトの受信があったときのみ 自前のタイマーで待ち受けるという処理にした。この戦略は意外とうまくいったのだが、今回はこの方法を見送った。 というのも、通信ボーレート(bps)に依ってタイムアウトを変更したいと思っていたのだが、2400 bps だと最大で約40ms の待ちを設けなければならないことが分かったためだ。なぜ、そうなるのかを説明すると、8バイト受信後、まだデータが存在していないことがわかるのは、次の受信割り込みのタイミングまでの時間で割り込みがかからなかった場合に判明する。受信割り込みのほうが割り込みがかかる時間が長いので、そちらがかからない場合にようやく次の受信がなかったということになる。8バイト受信に必要な時間は、8バイト × (1バイトに含まれるビット数) / (ボーレート) となるので、1バイトに含まれるビット数が11bit だとすると、0.036 秒 つまり 36 ms となる。ある程度の遅延があるとして約40ms のタイムアウトになる。

1バイトだけ残す

FIFOにデータがない場合は受信割り込みがかからないということは、逆に言えば、1バイトでもデータがあれば、受信タイムアウト割り込みによって割り込みが発生するということになる。なので、8バイト受信したときにFIFOから7バイト受信するだけにしておき、1バイト残しておけば、次のサイクルで受信データがなければ受信タイムアウト割り込みが発生するはずだ。ということは、受信タイムアウトが発生した場合は必ずデータがそこで終了したということを表すことになる。そのためにはシリアルドライバの修正が必要になる。

github.com

Raspbian のソースは上記リポジトリにある。その中のPL011ドライバのソースは drivers/tty/serial/amba-pl011.c がそれに当たる。修正箇所は以下の通り

  1. uart_amba_port に 受信タイムアウト割り込み回数を記録するための変数を用意
  2. pl011_ioctl 関数を実装
  3. pl011_fifo_to_tty 関数 で受信タイムアウト割り込みの個数をカウント
  4. pl011_fifo_to_tty 関数 で受信割り込みの場合はFIFOからのデータ転送を7バイトに抑える処理を追加
  5. pl011_fifo_to_tty 関数 で割り込みステータスを判定するために 呼び出し元関数(pl011_rx_chars, pl011_int)を修正

uart_amba_port 構造体

受信タイムアウト割り込みが発生したことをユーザープログラムから認識するために、受信タイムアウト割り込みが発生した個数を記録しておき、ioctrl で取得するようにする。そのために、uart_amba_port にカウンタ変数を用意する。uart_amba_port は PL011 UART ポートに必要な情報を格納するための構造体。汎用UARTポート構造体の uart_port をラップする形で定義されている。

struct uart_amba_port {
    struct uart_port   port;
    const u16      *reg_offset;
    struct clk     *clk;
    const struct vendor_data *vendor;
    unsigned int      dmacr;      /* dma control reg */
    unsigned int      im;     /* interrupt mask */
    unsigned int      old_status;
    unsigned int      fifosize;   /* vendor-specific */
    unsigned int      old_cr;     /* state during shutdown */
    bool           autorts;
    unsigned int      fixed_baud; /* vendor-set fixed baud rate */
    char           type[12];
    unsgined long rxto; /* ← この行を追加 */
#ifdef CONFIG_DMA_ENGINE
    /* DMA stuff */
    bool           using_tx_dma;
    bool           using_rx_dma;
    struct pl011_dmarx_data dmarx;
    struct pl011_dmatx_data    dmatx;
    bool           dma_probed;
#endif
};

pl011_ioctl 関数

amba-pl011.c にはioctlが実装されていないので、実装する必要がある。

static int pl011_ioctl(struct uart_port *port, unsigned int cmd, unsigned long arg)
{
    struct uart_amba_port *uap =
        container_of(port, struct uart_amba_port, port);

    int ret = -ENOIOCTLCMD;

    switch(cmd) {
        case 0x80000001:
        {
            unsigned long *counter = (unsigned long *)arg;
            if(arg != 0)
                *counter = uap->rxto;
            ret = 0;
            break;
        }
    }
    return ret;
}

この関数を上位から呼び出すために amba_pl011_pops 構造体に追加する

static const struct uart_ops amba_pl011_pops = {
    .tx_empty   = pl011_tx_empty,
    .set_mctrl  = pl011_set_mctrl,
    .get_mctrl  = pl011_get_mctrl,
    .stop_tx    = pl011_stop_tx,
    .start_tx   = pl011_start_tx,
    .stop_rx    = pl011_stop_rx,
    .enable_ms  = pl011_enable_ms,
    .break_ctl  = pl011_break_ctl,
    .startup    = pl011_startup,
    .shutdown   = pl011_shutdown,
    .flush_buffer   = pl011_dma_flush_buffer,
    .set_termios    = pl011_set_termios,
    .type       = pl011_type,
    .release_port   = pl011_release_port,
    .request_port   = pl011_request_port,
    .config_port    = pl011_config_port,
    .verify_port    = pl011_verify_port,
    .ioctl = pl011_ioctl, /* ← この行を追加する */
#ifdef CONFIG_CONSOLE_POLL
    .poll_init     = pl011_hwinit,
    .poll_get_char = pl011_get_poll_char,
    .poll_put_char = pl011_put_poll_char,
#endif
};

この関数はserial_core.c 内の ioctl関数から、与えられたコマンドに合致しない場合のみ呼ばれる。

pl011_fifo_to_tty 関数

pl011_fifo_to_tty 関数にて FIFOからttyのバッファにデータを転送しているので、その部分で受信タイムアウト割り込みが発生した回数を記録すると同時に、受信割り込みだった場合は転送を7バイトに抑える処理を入れる。

// まず、引数に割り込みステータス(stat)を追加する
static int pl011_fifo_to_tty(struct uart_amba_port *uap, unsigned int stat)
{
    u16 status;
    unsigned int ch, flag, max_count = 256;
    int fifotaken = 0;

// 受信タイムアウト割り込みのカウンタを増やす
    if (stat & UART011_RTIS)
        uap->rxto++;

    while (max_count--) {
        status = pl011_read(uap, REG_FR);
        if (status & UART01x_FR_RXFE)
            break;

// 受信割り込みの場合はデータ転送量を7バイトに制限している
        if (stat & UART011_RXIS && fifotaken >= 7)
            break;

        /* Take chars from the FIFO and update status */
        ch = pl011_read(uap, REG_DR) | UART_DUMMY_DR_RX;
        flag = TTY_NORMAL;
        uap->port.icount.rx++;
        fifotaken++;

        if (unlikely(ch & UART_DR_ERROR)) {
            if (ch & UART011_DR_BE) {
                ch &= ~(UART011_DR_FE | UART011_DR_PE);
                uap->port.icount.brk++;
                if (uart_handle_break(&uap->port))
                    continue;
            } else if (ch & UART011_DR_PE)
                uap->port.icount.parity++;
            else if (ch & UART011_DR_FE)
                uap->port.icount.frame++;
            if (ch & UART011_DR_OE)
                uap->port.icount.overrun++;

            ch &= uap->port.read_status_mask;

            if (ch & UART011_DR_BE)
                flag = TTY_BREAK;
            else if (ch & UART011_DR_PE)
                flag = TTY_PARITY;
            else if (ch & UART011_DR_FE)
                flag = TTY_FRAME;
        }

        if (uart_handle_sysrq_char(&uap->port, ch & 255))
            continue;

        uart_insert_char(&uap->port, ch, UART011_DR_OE, ch, flag);
    }

    return fifotaken;
}

pl011_rx_chars, pl011_int 関数

pl011_fifo_to_tty の中で割り込みステータスを判定するために、呼び出し側からステータス情報を渡してやる必要がある。pl011_fifo_to_ttypl011_rx_chars から呼ばれており、その関数はpl011_intから呼ばれている。割り込みステータスはpl011_intで取得されるのでそのデータをpl011_rx_chars経由で渡してやる必要がある。

static irqreturn_t pl011_int(int irq, void *dev_id)
{
    struct uart_amba_port *uap = dev_id;
    unsigned long flags;
    unsigned int status, pass_counter = AMBA_ISR_PASS_LIMIT;
    u16 imsc;
    int handled = 0;

    spin_lock_irqsave(&uap->port.lock, flags);
    imsc = pl011_read(uap, REG_IMSC);
    status = pl011_read(uap, REG_RIS) & imsc;
    if (status) {
        do {
            check_apply_cts_event_workaround(uap);

            pl011_write(status & ~(UART011_TXIS|UART011_RTIS|
                           UART011_RXIS),
                    uap, REG_ICR);

            if (status & (UART011_RTIS|UART011_RXIS)) {
                if (pl011_dma_rx_running(uap))
                    pl011_dma_rx_irq(uap);
                else
                    pl011_rx_chars(uap, status); // ステータスを渡してやる必要がある
            }
            if (status & (UART011_DSRMIS|UART011_DCDMIS|
                      UART011_CTSMIS|UART011_RIMIS))
                pl011_modem_status(uap);
            if (status & UART011_TXIS)
                pl011_tx_chars(uap, true);

            if (pass_counter-- == 0)
                break;

            status = pl011_read(uap, REG_RIS) & imsc;
        } while (status != 0);
        handled = 1;
    }

    spin_unlock_irqrestore(&uap->port.lock, flags);

    return IRQ_RETVAL(handled);
}
// ステータス用の引数を足す
static void pl011_rx_chars(struct uart_amba_port *uap, unsgiend int status)
__releases(&uap->port.lock)
__acquires(&uap->port.lock)
{
    pl011_fifo_to_tty(uap, status); // ステータスを渡す

    spin_unlock(&uap->port.lock);
    tty_flip_buffer_push(&uap->port.state->port);
    /*
    * If we were temporarily out of DMA mode for a while,
    * attempt to switch back to DMA mode again.
    */
    if (pl011_dma_rx_available(uap)) {
        if (pl011_dma_rx_trigger_dma(uap)) {
            dev_dbg(uap->port.dev, "could not trigger RX DMA job "
                "fall back to interrupt mode again\n");
            uap->im |= UART011_RXIM;
            pl011_write(uap->im, uap, REG_IMSC);
        } else {
#ifdef CONFIG_DMA_ENGINE
            /* Start Rx DMA poll */
            if (uap->dmarx.poll_rate) {
                uap->dmarx.last_jiffies = jiffies;
                uap->dmarx.last_residue  = PL011_DMA_BUFFER_SIZE;
                mod_timer(&uap->dmarx.timer,
                    jiffies +
                    msecs_to_jiffies(uap->dmarx.poll_rate));
            }
#endif
        }
    }
    spin_lock(&uap->port.lock);
}

カーネルコンパイル

修正が終わったら、カーネルコンパイルする必要がある。方法は公式ドキュメントが詳しく正しいのでそちらを参考にしてもらいたい。

Kernel building - Raspberry Pi Documentation

修正版カーネルを使っての読み出し

修正版を使っての読み出しサンプルをリポジトリに公開した。そちらを参考にしてほしい。

github.com

サンプルでは受信データ数と受信割り込み数が前回値と異なっていた場合に受信完了としている。これは、受信割り込み数のみをチェックしていた場合、タイミングによっては、ttyバッファからの読み取りが途中であっても、受信割り込み数が異なる場合があったので、確実性を重視して、受信データ数もカウントに入れている。

今後の課題

最終的に理想に近い形で受信完了を判定できたが、115200 bps で送受信を繰り返していると、数十回に一回、数msのウェイトが入ってしまう場合があるようだ。nice 値を -20 に設定したとしても発生するが、何が原因が突き止められていない。できれば安定した受信ができればよいのだが今のところ目処が立たないでいる。いつかはそのあたりも解決していきたい。 (追記 : 2018/08/26 23:35) chrt コマンドを使用して、最大プライオリティを設定することで安定動作になることが判明した。

$ sudo chrt -f 99 ./a.out -b 115200 -f result.txt

ただ、ほかのタスクが完全に止まってしまうのでリスクはある。それを考えると、やはりシビアなタイミングを実行するようなプログラムはマイコンリアルタイムOSを使用するのが無難なのかもしれない。