commit 94bd5f470d032edc611f095d11618732d5f6bfe1
parent 0f0992bce47f0af0af0ceccbca8f75ea8eb93d08
Author: Tomas Hlavaty <tom@logand.com>
Date:   Sun,  5 May 2013 21:18:27 +0200
server protocol implemented
Diffstat:
| M | Makefile |  |  | 4 | ++-- | 
| M | common.c |  |  | 57 | +++++++++++++++++++++++++++++++++++++++++++++++++-------- | 
| M | dbquery-pg.c |  |  | 122 | ++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------- | 
| M | dbquery.asd |  |  | 1 | + | 
| M | dbquery.lisp |  |  | 327 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------- | 
5 files changed, 381 insertions(+), 130 deletions(-)
diff --git a/Makefile b/Makefile
@@ -6,8 +6,8 @@ all:
 .c.o:
 	$(CC) $(CFLAGS) -c -o $@ $<
 
-dbquery-pg: dbquery-pg.c common.o
-	$(CC) $(CFLAGS) -o $@ $< common.o -lpq
+dbquery-pg: dbquery-pg.c common.c
+	$(CC) $(CFLAGS) -o $@ $< -lpq
 	strip $@
 
 dbquery-mysql: dbquery-mysql.c common.o
diff --git a/common.c b/common.c
@@ -26,24 +26,65 @@
 #include <stdlib.h>
 #include <stdio.h>
 
-#define BLEN 4096
+#define HLEN 102400
 
-void query(char *q);
+enum Command {ERROR, QUERY, PREPARE, EXECUTE, DEALLOCATE};
 
-void die(const char *format, ...) {
+static void die(const char *format, ...) {
   va_list argv;
   va_start(argv, format);
   vfprintf(stderr, format, argv);
   va_end(argv);
   fprintf(stderr, "\n");
   fflush(stderr);
+  fflush(stdout);
   exit(-1);
 }
 
-void repl() {
-  for(; !feof(stdin);) {
-    char buf[BLEN];
-    if(!fgets(buf, BLEN, stdin)) break;
-    query(buf);
+static void pnum(int x) {printf("%d\n", x);}
+static void pqstr(char *x) {
+  putchar('"');
+  for(; *x; x++) {
+    switch(*x) {
+    case '\\': putchar('\\'); putchar('\\'); break;
+    case '"': putchar('\\'); putchar('"'); break;
+    default: putchar(*x);
+    }
   }
+  putchar('"');
+  putchar('\n');
 }
+static void pnil() {pqstr("NIL");}
+
+static int rnum(void) { // TODO limit interval
+  int z = 0, y = 0;
+  for(;;) {
+    int c = getchar();
+    if('0' <= c && c <= '9') {z = 10 * z + c - '0'; y = 1;}
+    else if('\n' == c && y) goto done;
+    else die("expected number");
+  }
+ done:
+  return z;
+}
+
+static char heap[HLEN];
+static char *buf;
+
+static char *rstr(void) {
+  char *z = buf;
+  if('"' != getchar()) die("expected string");
+  for(;;) {
+    if(heap + HLEN <= buf) die("out of memory");
+    int c = getchar();
+    switch(c) {
+    case '\\': *buf++ = getchar(); break;
+    case '"': if('\n' != getchar()) die("expected eol"); *buf++ = 0; goto done;
+    default: *buf++ = c;
+    }
+  }
+ done:
+  return z;
+}
+
+static int eof(void) {return EOF == ungetc(getchar(), stdin);}
diff --git a/dbquery-pg.c b/dbquery-pg.c
@@ -25,50 +25,88 @@
 #include <stdio.h>
 #include <libpq-fe.h>
 
-void die(const char *format, ...);
-void repl();
+#include "common.c"
 
-static PGconn *conn;
-
-void query(char *q) {
-  PGresult *z = PQexec(conn, q);
-  if(!z) die("error: query failed");
-  int i, j, n = PQntuples(z), m = PQnfields(z);
-  printf("((");
-  for(j = 0; j < m; j++) {
-    if(0 < j) printf(" ");
-    printf("\"%s\"", PQfname(z, j));
-  }
-  printf(")\n (");
-  for(j = 0; j < m; j++) {
-    if(0 < j) printf(" ");
-    printf("%d", PQftype(z, j));
-  }
-  printf(")");
-  for(i = 0; i < n; i++) {
-    printf("\n (");
-    for(j = 0; j < m; j++) {
-      if(0 < j) printf(" ");
-      if(PQgetisnull(z, i, j))
-        printf("NIL");
-      else
-        switch(PQftype(z, j)) {
-        case 1700: // float
-        case 23: printf("%s", PQgetvalue(z, i, j)); break; // int
-        default:
-          printf("\"%s\"", PQgetvalue(z, i, j));
+int main(int argc, char *argv[]) {
+  if(argc != 2) die("usage: %s conninfo", argv[0]);
+  PGconn *conn = PQconnectdb(argv[1]);
+  if(!conn || CONNECTION_OK != PQstatus(conn)) die("error: connection failed");
+  for(;;) {
+    if(eof()) return 0;
+    buf = heap;
+    int i, j, cmd = rnum(), nargs = rnum();
+    PGresult *z;
+    {
+      Oid atype[nargs];
+      char *aval[nargs];
+      int alen[nargs], afmt[nargs];
+      char *q, *stm;
+      switch(cmd) {
+      case QUERY:
+        q = rstr();
+        for(i = 0; i < nargs; i++) {
+          atype[i] = rnum();
+          aval[i] = rstr();
+          alen[i] = 0;
+          afmt[i] = 0;
+        }
+        z = PQexecParams(conn, q, nargs, atype, (const char * const *) aval, alen, afmt, 0);
+        if(PGRES_TUPLES_OK != PQresultStatus(z)) goto fail;
+        goto rows;
+      case PREPARE:
+        stm = rstr();
+        q = rstr();
+        for(i = 0; i < nargs; i++)
+          atype[i] = rnum();
+        z = PQprepare(conn, stm, q, nargs, atype);
+        if(PGRES_COMMAND_OK != PQresultStatus(z)) goto fail;
+        pnum(cmd);
+        goto done;
+      case EXECUTE:
+        stm = rstr();
+        for(i = 0; i < nargs; i++) {
+          aval[i] = rstr();
+          alen[i] = 0;
+          afmt[i] = 0;
         }
+        z = PQexecPrepared(conn, stm, nargs, (const char * const *) aval, alen, afmt, 0);
+        if(PGRES_TUPLES_OK != PQresultStatus(z)) goto fail;
+        goto rows;
+      default: die("unexpected command %d", cmd);
+      }
     }
-    printf(")");
+  rows:
+    pnum(cmd);
+    int n = PQntuples(z), m = PQnfields(z);
+    pnum(n);
+    pnum(m);
+    for(j = 0; j < m; j++) {
+      pnum(PQftype(z, j));
+      pqstr(PQfname(z, j));
+    }
+    for(i = 0; i < n; i++) {
+      for(j = 0; j < m; j++) {
+        if(PQgetisnull(z, i, j))
+          pnil();
+        else
+          pqstr(PQgetvalue(z, i, j));
+        /* switch(PQftype(z, j)) { */
+        /* case 1700: // float */
+        /* case 23: // int */
+        /*   pstr(PQgetvalue(z, i, j)); */
+        /*   break; */
+        /* default: */
+        /*   pqstr(PQgetvalue(z, i, j)); */
+        /* } */
+      }
+    }
+    goto done;
+  fail:
+    pnum(ERROR);
+    pqstr(PQresultErrorField(z, PG_DIAG_SQLSTATE));
+    pqstr(PQerrorMessage(conn));
+  done:
+    fflush(stdout);
+    PQclear(z);
   }
-  printf(")\n");
-  fflush(stdout);
-}
-
-int main(int argc, char **argv) {
-  if(argc < 2) die("usage: %s conninfo [query]", argv[0]);
-  if(!(conn = PQconnectdb(argv[1])) || (PQstatus(conn) != CONNECTION_OK))
-    die("error: connection failed");
-  if(argc < 3) repl(); else query(argv[2]);
-  return 0;
 }
diff --git a/dbquery.asd b/dbquery.asd
@@ -32,5 +32,6 @@
   :author "Tomas Hlavaty <tom@logand.com>"
   :maintainer "Tomas Hlavaty <tom@logand.com>"
   :licence "MIT"
+  :depends-on (#+sbcl :sb-concurrency)
   :serial t
   :components ((:file "dbquery")))
diff --git a/dbquery.lisp b/dbquery.lisp
@@ -22,64 +22,93 @@
 
 (defpackage :dbquery
   (:use :cl)
-  (:export :make-mysql-server
+  (:export :query
+           :prepare
+           :execute
+           :deallocate
+           :finish
            :make-pg-server
+           :make-mysql-server
            :make-sqlite-server))
 
 (in-package :dbquery)
 
-(defun call-with-program-output (command fn)
-  (let ((x (ccl:run-program (car command) (cdr command) :output :stream)))
-    (funcall fn (ccl:external-process-output-stream x))
-    #+nil
-    (unwind-protect (funcall fn (ccl:external-process-output-stream x))
-      (ccl:external-process-status ))))
-
-(defmacro with-program-output ((stream command) &body body)
-  `(call-with-program-output ,command (lambda (,stream) ,@body)))
-
-#+nil
-(time
- (with-program-output (s (list "./dbquery-sqlite"
-                               "sqlite.db"
-                               "select 123, 1, 0, null, 12.34, 'hello'"))
-   (read s)))
-
-#+nil
-(time
- (with-program-output (s (list "./dbquery-pg"
-                               "dbname='pokus' user='tomas'"
-                               "select 123, 1, 0, null, 12.34, 'hello'"))
-   (read s)))
-
-#+nil
-(time
- (with-program-output (s (list "./dbquery-mysql"
-                               "localhost"
-                               "tomas"
-                               "Ri3OoL3h"
-                               "pokus"
-                               "select 123, 1, 0, null, 12.34, 'hello'"
-                               ))
-   (read s)))
+;; (defun call-with-program-output (command fn)
+;;   #+ccl
+;;   (let ((x (ccl:run-program (car command) (cdr command) :output :stream)))
+;;     (funcall fn (ccl:external-process-output-stream x))
+;;     #+nil
+;;     (unwind-protect (funcall fn (ccl:external-process-output-stream x))
+;;       (ccl:external-process-status )))
+;;   #+ecl
+;;   (multiple-value-bind (io status proc)
+;;       (ext:run-program (car command) (cdr command)
+;;                        :input nil
+;;                        :output :stream
+;;                        :error nil ;;t
+;;                        :wait nil)
+;;     (when proc
+;;       (funcall fn io)
+;;       #+nil
+;;       (unwind-protect (funcall fn io)
+;;         (close io)
+;;         (ext:external-process-wait proc))))
+;;   #-(or ccl ecl)
+;;   (error "DBQUERY::CALL-WITH-PROGRAM-OUTPUT not implemented"))
+
+;; (defmacro with-program-output ((stream command) &body body)
+;;   `(call-with-program-output ,command (lambda (,stream) ,@body)))
+
+;; ;;(with-program-output (s '("sha1sum" "/etc/passwd")))
+;; ;;(with-program-output (s '("sha1sum" "/etc/passwd")) (read-line s))
+
+(defmacro with-lock ((lock) &body body)
+  #+ccl `(ccl:with-lock-grabbed (,lock) ,@body)
+  #+ecl `(mp:with-lock (,lock) ,@body)
+  #+sbcl `(sb-concurrency::with-mutex (,lock) ,@body)
+  #-(or ccl ecl sbcl) (error "DBQUERY::WITH-LOCK not implemented"))
+
+(defun make-lock (name)
+  #+ccl (ccl:make-lock name)
+  #+ecl (mp:make-lock :name name)
+  #+sbcl (sb-concurrency::make-mutex :name (string name))
+  #-(or ccl ecl sbcl) (error "DBQUERY::MAKE-LOCK not implemented"))
+
+(defun make-semaphore ()
+  #+ccl (ccl:make-semaphore)
+  #+ecl (mp:make-semaphore)
+  #+sbcl (sb-concurrency::make-semaphore)
+  #-(or ccl ecl sbcl) (error "DBQUERY::MAKE-SEMAPHORE not implemented"))
+
+(defun signal-semaphore (x)
+  #+ccl (ccl:signal-semaphore x)
+  #+ecl (mp:signal-semaphore x)
+  #+sbcl (sb-concurrency::signal-semaphore x)
+  #-(or ccl ecl sbcl) (error "DBQUERY::SIGNAL-SEMAPHORE not implemented"))
+
+(defun wait-on-semaphore (x)
+  #+ccl (ccl:wait-on-semaphore x)
+  #+ecl (mp:wait-on-semaphore x)
+  #+sbcl (sb-concurrency::wait-on-semaphore x)
+  #-(or ccl ecl sbcl) (error "DBQUERY::WAIT-ON-SEMAPHORE not implemented"))
 
 (defun make-concurrent-queue ()
   (let ((x (cons nil nil))
-        (l (ccl:make-lock 'concurrent-queue-lock))
-        (s (ccl:make-semaphore)))
+        (l (make-lock 'concurrent-queue-lock))
+        (s (make-semaphore)))
     (setf (car x) x)
     (lambda (&optional (value nil valuep))
       (if valuep
           (let ((y (cons value nil)))
-            (ccl:with-lock-grabbed (l)
+            (with-lock (l)
               (setf (cdar x) y
                     (car x) y)
-              (ccl:signal-semaphore s))
+              (signal-semaphore s))
             value)
           (do (done z)
               (done z)
-            (ccl:wait-on-semaphore s)
-            (ccl:with-lock-grabbed (l)
+            (wait-on-semaphore s)
+            (with-lock (l)
               (unless (eq x (car x))
                 (setq done t
                       z (pop (cdr x)))
@@ -92,53 +121,173 @@
 ;; (funcall q 3)
 ;; (funcall q)
 
-(defun make-program-server (command args writer reader)
+(defun make-thread (name fn)
+  #+ccl (ccl:process-run-function name fn)
+  #+ecl (mp:process-run-function name fn)
+  #+sbcl (sb-concurrency::make-thread fn :name (string name))
+  #-(or ccl ecl sbcl) (error "DBQUERY::MAKE-THREAD not implemented"))
+
+(defun make-program (command args)
+  #+ccl
   (let ((p (ccl:run-program command
                             args
                             :input :stream
                             :output :stream
+                            :error nil
                             :sharing :external
                             :wait nil)))
     (assert (eq :running (ccl:external-process-status p)))
-    (let ((wq (make-concurrent-queue)))
-      (ccl:process-run-function
-       'program-server-writer
-       (let ((s (ccl:external-process-input-stream p)))
-         (lambda ()
-           (do (x)
-               ((not (setq x (funcall wq)))
-                (close s))
-             (funcall writer x s)))))
-      (let ((l (ccl:make-lock 'program-server-lock))
-            (s (ccl:external-process-output-stream p)))
-        (lambda (query)
-          (ccl:with-lock-grabbed (l)
-            (when wq
-              (cond
-                (query
-                 (funcall wq query)
-                 (funcall reader s))
-                (t (funcall wq nil)
-                   (setq wq nil)
-                   (ccl::external-process-wait p)
-                   (multiple-value-bind (status code) (ccl:external-process-status p)
-                     (assert (eq :exited status))
-                     (assert (zerop code))))))))))))
-
-(defun dbquery-writer (value stream)
-  (write-line value stream)
-  (finish-output stream))
-
-(defun dbquery-reader (stream) ;; TODO raise backend errors!
-  (let (*read-eval*)
-    (prog1 (read stream nil nil)
-      (assert (char= #\newline (read-char stream))))))
+    (lambda (msg)
+      (ecase msg
+        (status-and-code (ccl:external-process-status p))
+        (input-stream (ccl:external-process-input-stream p))
+        (output-stream (ccl:external-process-output-stream p))
+        (wait (ccl::external-process-wait p)))))
+  #+ecl
+  (multiple-value-bind (io x p)
+      (ext:run-program command
+                       args
+                       :input :stream
+                       :output :stream
+                       :error nil
+                       :wait nil)
+    (declare (ignore x))
+    (assert (eq :running (ext:external-process-status p)))
+    (lambda (msg)
+      (ecase msg
+        (status-and-code (ext:external-process-status p))
+        (input-stream io)
+        (output-stream io)
+        (wait (ext:external-process-wait p)))))
+  #+sbcl
+  (let ((p (sb-ext:run-program command
+                               args
+                               :input :stream
+                               :output :stream
+                               :error nil
+                               :wait nil)))
+    (assert (eq :running (sb-ext:process-status p)))
+    (lambda (msg)
+      (ecase msg
+        (status-and-code (sb-ext:process-status p))
+        (input-stream (sb-ext:process-input p))
+        (output-stream (sb-ext:process-output p))
+        (wait (sb-ext:process-wait p)))))
+  #-(or ccl ecl sbcl)
+  (error "DBQUERY::MAKE-PROGRAM not implemented"))
 
-(defun make-sqlite-server (command db)
-  (make-program-server command (list db) 'dbquery-writer 'dbquery-reader))
+(defun make-program-server (command args writer reader)
+  (let ((p (make-program command args))
+        (wq (make-concurrent-queue)))
+    (make-thread 'program-server-writer
+                 (let ((s (funcall p 'input-stream)))
+                   (lambda ()
+                     (do (x)
+                         ((not (setq x (funcall wq)))
+                          (close s))
+                       (funcall writer x s)))))
+    (let ((l (make-lock 'program-server-lock))
+          (s (funcall p 'output-stream)))
+      (lambda (query)
+        (with-lock (l)
+          (when wq
+            (cond
+              (query
+                  (funcall wq query)
+                (funcall reader s))
+              (t (funcall wq nil)
+                 (setq wq nil)
+                 (funcall p 'wait)
+                 (multiple-value-bind (status code) (funcall p 'status-and-code)
+                   (assert (eq :exited status))
+                   (assert (zerop code)))))))))))
+
+(defun query (server q &rest args)
+  (funcall server `(:query ,q ,@args)))
+
+(defun prepare (server stm q &rest args)
+  (funcall server `(:prepare ,stm ,q ,@args)))
+
+(defun execute (server stm &rest args)
+  (funcall server `(:execute ,stm ,@args)))
+
+(defun deallocate (server stm)
+  (funcall server `(:deallocate ,stm)))
+
+(defun finish (server)
+  (funcall server nil))
+
+(defun rnum ()
+  (do (done y (z 0))
+      (done z)
+    (let ((c (read-char)))
+      (cond
+        ((char<= #\0 c #\9)
+         (setq y t
+               z (+ (* 10 z) (char-code c) #.(- (char-code #\0)))))
+        ((and (char= #\newline c) y) (setq done t))
+        (t (error "expected number ~d ~s" z c))))))
+
+(defun rstr ()
+  (assert  (char= #\" (read-char)))
+  (with-output-to-string (*standard-output*)
+    (do (done)
+        (done)
+      (let ((c (read-char)))
+        (case c
+          (#\\ (write-char (read-char)))
+          (#\" (assert (char= #\newline (read-char)))
+               (setq done t))
+          (t (write-char c)))))))
+
+(defun dbquery-pg-writer (value *standard-output*)
+  (destructuring-bind (cmd &rest rest) value
+    (flet ((%query (q &rest args)
+             (format t "1~%~d~%~s~%" (length args) q)
+             (dolist (a args)
+               (etypecase a
+                 (integer (format t "~s~%\"~s\"~%" 23 a))
+                 (string (format t "~s~%~s~%" 705 a))))))
+      (ecase cmd
+        (:query (apply #'%query rest))
+        (:prepare
+         (destructuring-bind (stm q &rest args) rest
+           (format t "2~%~d~%~s~%~s~%" (length args) stm q)
+           (dolist (a args)
+             (etypecase a
+               (integer (format t "~s~%" 23))
+               (string (format t "~s~%" 705))))))
+        (:execute
+         (destructuring-bind (stm &rest args) rest
+           (format t "3~%~d~%~s~%" (length args) stm)
+           (dolist (a args)
+             (etypecase a
+               (integer (format t "\"~s\"~%" a))
+               (string (format t "~s~%" a))))))
+        (:deallocate
+         (destructuring-bind (stm) rest
+           (%query (format nil "DEALLOCATE ~s" stm)))))))
+  (finish-output))
+
+(defun dbquery-pg-reader (*standard-input*)
+  (ecase (rnum)
+    (0 (error "Database error ~s ~s" (rstr) (rstr)))
+    ((1 3)
+     (let ((n (rnum))
+           (m (rnum)))
+       (when (and (plusp n) (plusp m))
+         (cons (loop for i from 0 below m collect (cons (rnum) (rstr)))
+               (loop
+                  for i from 0 below n
+                  appending (loop for j from 0 below m collect (rstr)))))))
+    (2)))
 
 (defun make-pg-server (command connection-info)
-  (make-program-server command (list connection-info) 'dbquery-writer 'dbquery-reader))
+  (make-program-server
+   command (list connection-info) 'dbquery-pg-writer 'dbquery-pg-reader))
+
+(defun make-sqlite-server (command db)
+  (make-program-server command (list db) 'dbquery-writer 'dbquery-reader))
 
 (defun make-mysql-server (command host user password db)
   (make-program-server command (list host user password db) 'dbquery-writer 'dbquery-reader))
@@ -161,3 +310,25 @@
 ;; (time (funcall c "select 1, 2+3"))
 ;; (time (funcall c "select 4, 'hello'"))
 ;; (funcall c nil)
+
+;;; (() ...) first car param types, then query, then params
+
+;; in: nargs query [[type param]...]
+;; out: ncol nrow [[[ctyp cname]...] [row...]]
+
+;; 0 "select 1, 2+3" => 2 1 91 91 "1" "2+3" "1" "5"
+;; 2 "select 1, 2+3, $1, $2" 0 "hi" 0 "123" => 4 1 91 91 92 93 "1" "2+3" "$1" "$2" "1" "5" "hi" "123"
+
+;;(dbquery-pg-writer '(:query "select $1, 1, 2+3" 123) *standard-output*)
+;;(dbquery-pg-writer '(:prepare "stm1" "select $1, 1, 2+3" 1234567890) *standard-output*)
+;;(dbquery-pg-writer '(:execute "stm1" 1234567890) *standard-output*)
+;;(dbquery-pg-writer '(:deallocate "stm1") *standard-output*)
+
+;;(defparameter c (make-pg-server "/home/tomas/git/dbquery/dbquery-pg" "dbname='pokus' user='tomas'"))
+;;(query c "select 1, 2+3")
+;;(query c "select 1, 2+3 from hi")
+;;(query c "select $1, 1, 2+3" 1234567890)
+;;(query c "select $1, 1, 2+3" 1234567890123456789)
+;;(prepare c "stm1" "select $1, 1, 2+3" 1234567890)
+;;(execute c "stm1" 890)
+;;(deallocate c "stm1") <<<< TODO crashes