Đăng nhập Tài khoản VIP

Golang + Postgres | Notifications: LISTEN/NOTIFY

Yo! Hôm nay chúng ta sẽ khám phá một tính năng rất hay từ thư viện làm việc với Postgres của Golang, ref > https://github.com/lib/pq


Đặt vấn đề: Mỗi khi có một sự kiện INSERT or UPDATE ở một TABLE nào đó trong DATABASE thì bên ngoài có thể lắng nghe và có những xử lý tương ứng.

Giải quyết: Mình nghĩ tới 2 cách tiếp cận, một là lắng nghe log file từ WAL hoặc sử dụng luôn tính năng Notifications: LISTEN/NOTIFY từ pg lib.


Và hôm nay chúng ta sẽ săm soi Notifications: LISTEN/NOTIFY

Trong Postgres sẽ tạo ra một TRIGGER, mỗi khi có INSERT or UPDATE thì sẽ call function pg_notify ('event_name', json_payload)

-- Tạo table TEST 
create table TEST( ID     serial
                 , TIJD   timestamp
                 , ACTION varchar( 100 ) )

-- Tạo một procedure
create or replace function notify_message()
returns trigger as $$
declare
  payload JSON;
begin
   payload = json_build_object( 
          'action', TG_OP, 
          'data', row_to_json( NEW ) 
        );
   perform pg_notify( 'event_channel', payload::text );       
   return null;
end;
$$ language plpgsql;

-- Tạo trigger mỗi khi insert or update thì call procedure
create trigger taiu_worklog
after insert or update
on TEST -- TEST là table name
for each row
execute procedure notify_message();


Tiếp theo, Golang sẽ lắng nghe channel với tên là 'event_channel' mà mình truyền vào function pg_notify.

package main


import (
  "bytes"
  "database/sql"
  "encoding/json"
  "fmt"
  "time"
  "github.com/lib/pq"
)


func waitForNotification(l *pq.Listener) {
  for {
    select {
    case n := <-l.Notify:
      fmt.Println("Received data from channel [", n.Channel, "] :")
      // Prepare notification payload for pretty print
      var prettyJSON bytes.Buffer
      err := json.Indent(&prettyJSON, []byte(n.Extra), "", "\t")
      if err != nil {
        fmt.Println("Error processing JSON: ", err)
        return
      }
      fmt.Println(string(prettyJSON.Bytes()))
      return
    case <-time.After(90 * time.Second):
      fmt.Println("Received no events for 90 seconds, checking connection")
      go func() {
        l.Ping()
      }()
      return
    }
  }
}


func main() {
  var conninfo string = "dbname=ryan user=ryan sslmode=disable"


  _, err := sql.Open("postgres", conninfo)
  if err != nil {
    panic(err)
  }


  reportProblem := func(ev pq.ListenerEventType, err error) {
    if err != nil {
      fmt.Println(err.Error())
    }
  }


  listener := pq.NewListener(conninfo, 10*time.Second, time.Minute, reportProblem)
  err = listener.Listen("event_channel")
  if err != nil {
    panic(err)
  }


  fmt.Println("Start monitoring PostgreSQL...")
  for {
    waitForNotification(listener)
  }
}



Run Go server sau đó thực hiện một lệnh INSERT vào table TEST kết quả sẽ như sau

insert into TEST( TIJD, ACTION ) values ( now(), 'Test 2' );


go run main.go

Start monitoring PostgreSQL...

Received data from channel [ event_channel ] :

{

	"action": "INSERT",

	"data": {

		"id": 6,

		"tijd": "2020-03-12T08:48:30.613332",

		"action": "Test 2"

	}

}


Tính năng này chúng ta có thể áp dụng cho log event, hay thậm chí là giao tiếp giữa các service ví dụ service A lấy được event từ database sẽ đẩy data trong event này vào một message broker và service B đang lắng nghe message broker này có thể nhận được thông tin đó.


Source: PostgreSQL Listen/Notification proces