/*
  +----------------------------------------------------------------------+
  | Swoole                                                               |
  +----------------------------------------------------------------------+
  | This source file is subject to version 2.0 of the Apache license,    |
  | that is bundled with this package in the file LICENSE, and is        |
  | available through the world-wide-web at the following url:           |
  | http://www.apache.org/licenses/LICENSE-2.0.html                      |
  | If you did not receive a copy of the Apache2.0 license and are unable|
  | to obtain it through the world-wide-web, please send a note to       |
  | license@swoole.com so we can mail you a copy immediately.            |
  +----------------------------------------------------------------------+
  | Author: Zhenyu Wu  <936321732@qq.com>                                |
  +----------------------------------------------------------------------+
 */

#include "php_swoole.h"
#ifdef SW_COROUTINE
#include "swoole_coroutine.h"
#ifdef SW_USE_POSTGRESQL
#include "swoole_postgresql_coro.h"
#include "swoole_coroutine.h"

static PHP_METHOD(swoole_postgresql_coro, __construct);
static PHP_METHOD(swoole_postgresql_coro, __destruct);
static PHP_METHOD(swoole_postgresql_coro, connect);
static PHP_METHOD(swoole_postgresql_coro, query);
static PHP_METHOD(swoole_postgresql_coro, fetchAll);
static PHP_METHOD(swoole_postgresql_coro, affectedRows);
static PHP_METHOD(swoole_postgresql_coro, numRows);
static PHP_METHOD(swoole_postgresql_coro,metaData);
static PHP_METHOD(swoole_postgresql_coro,fetchObject);
static PHP_METHOD(swoole_postgresql_coro,fetchAssoc);
static PHP_METHOD(swoole_postgresql_coro,fetchArray);
static PHP_METHOD(swoole_postgresql_coro,fetchRow);

static void php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAMETERS, zend_long result_type, int into_object);

static void _destroy_pgsql_link(zend_resource *rsrc);
static void _free_result(zend_resource *rsrc);
static int swoole_pgsql_coro_onRead(swReactor *reactor, swEvent *event);
static int swoole_pgsql_coro_onWrite(swReactor *reactor, swEvent *event);
static int swoole_pgsql_coro_onError(swReactor *reactor, swEvent *event);
int php_pgsql_result2array(PGresult *pg_result, zval *ret_array, long result_type);
static int swoole_postgresql_coro_close(pg_object *pg_object);
static  int query_result_parse(pg_object *pg_object);
static  int meta_data_result_parse(pg_object *pg_object);

ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_void, 0, 0, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_connect, 0, 0, -1)
    ZEND_ARG_INFO(0, conninfo)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_query, 0, 0, 0)
    ZEND_ARG_INFO(0, query)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_fetch_all, 0, 0, 0)
    ZEND_ARG_INFO(0, result)
    ZEND_ARG_INFO(0, result_type)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_affected_rows, 0, 0, 0)
    ZEND_ARG_INFO(0, result)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_num_rows, 0, 0, 0)
    ZEND_ARG_INFO(0, result)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_meta_data, 0, 0, 1)
    ZEND_ARG_INFO(0, table_name)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_fetch_row, 0, 0, 1)
    ZEND_ARG_INFO(0, result)
    ZEND_ARG_INFO(0, row)
    ZEND_ARG_INFO(0, result_type)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_fetch_assoc, 0, 0, 1)
    ZEND_ARG_INFO(0, result)
    ZEND_ARG_INFO(0, row)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_fetch_array, 0, 0, 1)
    ZEND_ARG_INFO(0, result)
    ZEND_ARG_INFO(0, row)
    ZEND_ARG_INFO(0, result_type)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_fetch_object, 0, 0, 1)
    ZEND_ARG_INFO(0, result)
    ZEND_ARG_INFO(0, row)
    ZEND_ARG_INFO(0, class_name)
    ZEND_ARG_INFO(0, l)
    ZEND_ARG_INFO(0, ctor_params)
ZEND_END_ARG_INFO()

static const zend_function_entry swoole_postgresql_coro_methods[] =
{
    PHP_ME(swoole_postgresql_coro, __construct, arginfo_swoole_void, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
    PHP_ME(swoole_postgresql_coro, connect, arginfo_pg_connect, ZEND_ACC_PUBLIC)
    PHP_ME(swoole_postgresql_coro, query, arginfo_pg_query, ZEND_ACC_PUBLIC )
    PHP_ME(swoole_postgresql_coro, fetchAll, arginfo_pg_fetch_all, ZEND_ACC_PUBLIC)
    PHP_ME(swoole_postgresql_coro, affectedRows, arginfo_pg_affected_rows, ZEND_ACC_PUBLIC)
    PHP_ME(swoole_postgresql_coro, numRows, arginfo_pg_num_rows, ZEND_ACC_PUBLIC)
    PHP_ME(swoole_postgresql_coro, metaData, arginfo_pg_meta_data, ZEND_ACC_PUBLIC)
    PHP_ME(swoole_postgresql_coro, fetchObject, arginfo_pg_fetch_object, ZEND_ACC_PUBLIC)
    PHP_ME(swoole_postgresql_coro, fetchAssoc, arginfo_pg_fetch_assoc, ZEND_ACC_PUBLIC)
    PHP_ME(swoole_postgresql_coro, fetchArray, arginfo_pg_fetch_array, ZEND_ACC_PUBLIC)
    PHP_ME(swoole_postgresql_coro, fetchRow, arginfo_pg_fetch_row, ZEND_ACC_PUBLIC)
    PHP_ME(swoole_postgresql_coro, __destruct, arginfo_swoole_void, ZEND_ACC_PUBLIC | ZEND_ACC_DTOR)
    PHP_FE_END
};

static zend_class_entry swoole_postgresql_coro_ce;
static zend_class_entry *swoole_postgresql_coro_class_entry_ptr;
static int le_link , le_result;

void swoole_postgresql_coro_init(int module_number TSRMLS_DC)
{

    INIT_CLASS_ENTRY(swoole_postgresql_coro_ce, "Swoole\\Coroutine\\PostgreSQL", swoole_postgresql_coro_methods);
    le_link = zend_register_list_destructors_ex(_destroy_pgsql_link, NULL, "pgsql link", module_number);
    le_result = zend_register_list_destructors_ex(_free_result, NULL, "pgsql result", module_number);
    swoole_postgresql_coro_class_entry_ptr = zend_register_internal_class(&swoole_postgresql_coro_ce TSRMLS_CC);

    if (SWOOLE_G(use_shortname))
    {
        sw_zend_register_class_alias("Co\\PostgreSQL", swoole_postgresql_coro_class_entry_ptr);
    }

    REGISTER_LONG_CONSTANT("SW_PGSQL_ASSOC", PGSQL_ASSOC, CONST_CS | CONST_PERSISTENT);
    REGISTER_LONG_CONSTANT("SW_PGSQL_NUM", PGSQL_NUM, CONST_CS | CONST_PERSISTENT);
    REGISTER_LONG_CONSTANT("SW_PGSQL_BOTH", PGSQL_BOTH, CONST_CS | CONST_PERSISTENT);

}

static PHP_METHOD(swoole_postgresql_coro, __construct)
{
    coro_check(TSRMLS_C);
    pg_object *pg;
    pg = emalloc(sizeof(pg_object));
    bzero(pg,sizeof(pg_object));
    pg -> object = getThis();
    swoole_set_object(getThis(), pg);
}

static PHP_METHOD(swoole_postgresql_coro, connect)
{
    zval *conninfo;
    PGconn * pgsql;

    ZEND_PARSE_PARAMETERS_START(1, 1)
        Z_PARAM_ZVAL(conninfo)
    ZEND_PARSE_PARAMETERS_END();

    pgsql = PQconnectStart(Z_STRVAL_P(conninfo));
    int fd =  PQsocket(pgsql);

    php_swoole_check_reactor();

    if (!swReactor_handle_isset(SwooleG.main_reactor, PHP_SWOOLE_FD_POSTGRESQL))
    {
        SwooleG.main_reactor->setHandle(SwooleG.main_reactor, PHP_SWOOLE_FD_POSTGRESQL | SW_EVENT_READ, swoole_pgsql_coro_onRead);
        SwooleG.main_reactor->setHandle(SwooleG.main_reactor, PHP_SWOOLE_FD_POSTGRESQL | SW_EVENT_WRITE, swoole_pgsql_coro_onWrite);
        SwooleG.main_reactor->setHandle(SwooleG.main_reactor, PHP_SWOOLE_FD_POSTGRESQL | SW_EVENT_ERROR, swoole_pgsql_coro_onError);
    }

    if (SwooleG.main_reactor->add(SwooleG.main_reactor, fd, PHP_SWOOLE_FD_POSTGRESQL | SW_EVENT_WRITE) < 0)
    {
        swoole_php_fatal_error(E_WARNING, "swoole_event_add failed.");
        RETURN_FALSE;
    }

    pg_object *pg_object = swoole_get_object(getThis());
    pg_object->fd = fd;
    pg_object->conn = pgsql;
    pg_object->status = CONNECTION_STARTED;

    PQsetnonblocking(pgsql , 1);

    if (pgsql==NULL || PQstatus(pgsql)==CONNECTION_BAD)
    {
        swWarn("Unable to connect to PostgreSQL server: [%s]",pgsql);
        if (pgsql)
        {
            PQfinish(pgsql);
        }
        RETURN_FALSE;
    }

    swConnection *_socket = swReactor_get(SwooleG.main_reactor, fd);
    _socket->object = pg_object;
    _socket->active = 0;

    php_context *sw_current_context = swoole_get_property(getThis(), 0);
    if (!sw_current_context)
    {
        sw_current_context = emalloc(sizeof(php_context));
        swoole_set_property(getThis(), 0, sw_current_context);
    }
    sw_current_context->state = SW_CORO_CONTEXT_RUNNING;
    sw_current_context->onTimeout = NULL;
    #if PHP_MAJOR_VERSION < 7
    sw_current_context->coro_params = getThis();
    #else
    sw_current_context->coro_params = *getThis();
    #endif

    //TODO:  add the timeout
    /*
    if (pg_object->timeout > 0)
    {
        php_swoole_check_timer((int) (ph_object->timeout * 1000));
        pg_object->timer = SwooleG.timer.add(&SwooleG.timer, (int) (redis->timeout * 1000), 0, sw_current_context, swoole_pgsql_coro_onTimeout);
    }*/
    coro_save(sw_current_context);
    coro_yield();

}

static int swoole_pgsql_coro_onWrite(swReactor *reactor, swEvent *event)
{
    char *errMsg;
#if PHP_MAJOR_VERSION < 7
    TSRMLS_FETCH_FROM_CTX(sw_thread_ctx ? sw_thread_ctx : NULL);
#endif

    if (event->socket->active)
    {
        return swReactor_onWrite(SwooleG.main_reactor, event);
    }

    socklen_t len = sizeof(SwooleG.error);
    if (getsockopt(event->fd, SOL_SOCKET, SO_ERROR, &SwooleG.error, &len) < 0)
    {
        swWarn("getsockopt(%d) failed. Error: %s[%d]", event->fd, strerror(errno), errno);
        return SW_ERR;
    }

    pg_object *pg_object = event->socket->object;


    // wait the connection ok
    ConnStatusType status =  PQstatus(pg_object->conn);
    if(status != CONNECTION_OK){
        PostgresPollingStatusType flag = PGRES_POLLING_WRITING;
        for (;;)
        {
            switch (flag)
            {
                case PGRES_POLLING_OK:
                    break;
                case PGRES_POLLING_READING:
                    break;
                case PGRES_POLLING_WRITING:
                    break;
                case PGRES_POLLING_FAILED:
                    errMsg = PQerrorMessage(pg_object->conn);
                    swWarn("error:[%s]",errMsg);
                    break;
                default:
                    break;
            }

            flag = PQconnectPoll(pg_object->conn);
            if(flag == PGRES_POLLING_OK )
            {
                break;
            }
            if(flag == PGRES_POLLING_FAILED )
            {
                errMsg = PQerrorMessage(pg_object->conn);
                swWarn("error:[%s] please cofirm that the connection configuration is correct \n",errMsg);
                break;
            }
        }

    }
    //listen read event
    SwooleG.main_reactor->set(SwooleG.main_reactor, event->fd, PHP_SWOOLE_FD_POSTGRESQL | SW_EVENT_READ);
    //connected
    event->socket->active = 1;

    php_context *sw_current_context = swoole_get_property(pg_object->object, 0);

    zval *retval = NULL;
    zval return_value;
    ZVAL_RES(&return_value, zend_register_resource(pg_object->conn, le_link));

    int ret = coro_resume(sw_current_context, &return_value, &retval);
    if (ret == CORO_END && retval)
    {
        sw_zval_ptr_dtor(&retval);
    }
    return SW_OK;
}

static int swoole_pgsql_coro_onRead(swReactor *reactor, swEvent *event)
{
    pg_object *pg_object = (event->socket->object);

#if PHP_MAJOR_VERSION < 7
    TSRMLS_FETCH_FROM_CTX(sw_thread_ctx ? sw_thread_ctx : NULL);
#endif

    switch (pg_object->request_type)
    {
        case NORMAL_QUERY:
            query_result_parse(pg_object);
            break;
        case META_DATA:
            meta_data_result_parse(pg_object);
            break;
    }

    return SW_OK;
}

static  int meta_data_result_parse(pg_object *pg_object)
{

    int i, num_rows;
    zval elem;
    PGresult *pg_result;
    zend_bool extended=0;
    pg_result =PQgetResult(pg_object->conn);

    if (PQresultStatus(pg_result) != PGRES_TUPLES_OK || (num_rows = PQntuples(pg_result)) == 0)
    {
        php_error_docref(NULL, E_WARNING, "Table doesn't exists");
        return  0;
    }

    zval  return_value;
    array_init(&return_value);
    zval * retval = NULL;
    array_init(&elem);
    for (i = 0; i < num_rows; i++)
    {
        pg_object->result = pg_result;
        char *name;
        /* pg_attribute.attnum */
        add_assoc_long_ex(&elem, "num", sizeof("num") - 1, atoi(PQgetvalue(pg_result, i, 1)));
        /* pg_type.typname */
        add_assoc_string_ex(&elem, "type", sizeof("type") - 1, PQgetvalue(pg_result, i, 2));
        /* pg_attribute.attlen */
        add_assoc_long_ex(&elem, "len", sizeof("len") - 1, atoi(PQgetvalue(pg_result, i, 3)));
        /* pg_attribute.attnonull */
        add_assoc_bool_ex(&elem, "not null", sizeof("not null") - 1, !strcmp(PQgetvalue(pg_result, i, 4), "t"));
        /* pg_attribute.atthasdef */
        add_assoc_bool_ex(&elem, "has default", sizeof("has default") - 1,
                          !strcmp(PQgetvalue(pg_result, i, 5), "t"));
        /* pg_attribute.attndims */
        add_assoc_long_ex(&elem, "array dims", sizeof("array dims") - 1, atoi(PQgetvalue(pg_result, i, 6)));
        /* pg_type.typtype */
        add_assoc_bool_ex(&elem, "is enum", sizeof("is enum") - 1, !strcmp(PQgetvalue(pg_result, i, 7), "e"));
        if (extended) {
            /* pg_type.typtype */
            add_assoc_bool_ex(&elem, "is base", sizeof("is base") - 1, !strcmp(PQgetvalue(pg_result, i, 7), "b"));
            add_assoc_bool_ex(&elem, "is composite", sizeof("is composite") - 1,
                              !strcmp(PQgetvalue(pg_result, i, 7), "c"));
            add_assoc_bool_ex(&elem, "is pesudo", sizeof("is pesudo") - 1,
                              !strcmp(PQgetvalue(pg_result, i, 7), "p"));
            /* pg_description.description */
            add_assoc_string_ex(&elem, "description", sizeof("description") - 1, PQgetvalue(pg_result, i, 8));
        }
        /* pg_attribute.attname */
        name = PQgetvalue(pg_result, i, 0);
        add_assoc_zval(&return_value, name, &elem);

    }
    php_context *sw_current_context = swoole_get_property(pg_object->object, 0);
    int ret  = coro_resume(sw_current_context, &return_value, &retval);
    if (ret == CORO_END && retval)
    {
        sw_zval_ptr_dtor(&retval);
    }
    zval_ptr_dtor(&return_value);
    zval_ptr_dtor(&elem);
    return SW_OK;
}


static  int query_result_parse(pg_object *pg_object)
{

    PGresult *pgsql_result;

    ExecStatusType status;


    int error = 0;
    char *errMsg;
    int ret, res;
    zval *retval = NULL;
    zval return_value;
    php_context *sw_current_context = swoole_get_property(pg_object->object, 0);

    pgsql_result =PQgetResult(pg_object->conn);

    status = PQresultStatus(pgsql_result);

    switch (status) {
        case PGRES_EMPTY_QUERY:
        case PGRES_BAD_RESPONSE:
        case PGRES_NONFATAL_ERROR:
        case PGRES_FATAL_ERROR:
            errMsg = PQerrorMessage(pg_object->conn);
            swWarn("Query failed: [%s]",errMsg);

            PQclear(pgsql_result);
            ZVAL_FALSE(&return_value);
            ret = coro_resume(sw_current_context, &return_value,  &retval);
            if (ret == CORO_END && retval)
            {
                sw_zval_ptr_dtor(&retval);
            }
            swoole_postgresql_coro_close(pg_object);
            break;
        case PGRES_COMMAND_OK: /* successful command that did not return rows */
        default:
            pg_object->result = pgsql_result;
            pg_object->row = 0;
            /* Wait to finish sending buffer */
            res = PQflush(pg_object->conn);

            ZVAL_RES(&return_value, zend_register_resource(pg_object, le_result));
            ret = coro_resume(sw_current_context, &return_value,  &retval);
            if (ret == CORO_END && retval)
            {
                sw_zval_ptr_dtor(&retval);
            }
            PQclear(pgsql_result);

            if (error != 0)
            {
                swoole_php_fatal_error(E_WARNING, "swoole_event->onError[1]: socket error. Error: %s [%d]", strerror(error), error);
            }

            break;
    }

    return SW_OK;
}

static PHP_METHOD(swoole_postgresql_coro, query)
{
    zval *query;
    PGconn *pgsql;
    PGresult *pgsql_result;

    ZEND_PARSE_PARAMETERS_START(1,1)
        Z_PARAM_ZVAL(query)
    ZEND_PARSE_PARAMETERS_END();

    pg_object *pg_object = swoole_get_object(getThis());
    pg_object->request_type = NORMAL_QUERY;
    pgsql = pg_object -> conn;

    while ((pgsql_result = PQgetResult(pgsql)))
    {
        PQclear(pgsql_result);
    }

    int ret  = PQsendQuery(pgsql, Z_STRVAL_P(query));
    if(ret == 0)
    {
        char * errMsg = PQerrorMessage(pgsql);
        swWarn("error:[%s]",errMsg);

    }

    php_context *sw_current_context = swoole_get_property(getThis(), 0);
    sw_current_context->state = SW_CORO_CONTEXT_RUNNING;
    sw_current_context->onTimeout = NULL;
    #if PHP_MAJOR_VERSION < 7
    sw_current_context->coro_params = getThis();
    #else
    sw_current_context->coro_params = *getThis();
    #endif

    //TODO:  add the timeout
    /*
        if (pg_object->timeout > 0)
        {
            php_swoole_check_timer((int) (ph_object->timeout * 1000));
            pg_object->timer = SwooleG.timer.add(&SwooleG.timer, (int) (redis->timeout * 1000), 0, sw_current_context, swoole_pgsql_coro_onTimeout);
        }*/
    coro_save(sw_current_context);
    coro_yield();
}

/* {{{ php_pgsql_result2array
 */
int swoole_pgsql_result2array(PGresult *pg_result, zval *ret_array, long result_type)
{
    zval row;
    char *field_name;
    size_t num_fields;
    int pg_numrows, pg_row;
    uint32_t i;
    assert(Z_TYPE_P(ret_array) == IS_ARRAY);

    if ((pg_numrows = PQntuples(pg_result)) <= 0)
    {
        return FAILURE;
    }
    for (pg_row = 0; pg_row < pg_numrows; pg_row++)
    {
        array_init(&row);
        for (i = 0, num_fields = PQnfields(pg_result); i < num_fields; i++)
        {
            field_name = PQfname(pg_result, i);
            if (PQgetisnull(pg_result, pg_row, i))
            {
                if (result_type & PGSQL_ASSOC)
                {
                    add_assoc_null(&row, field_name);
                }
                if (result_type & PGSQL_NUM)
                {
                    add_next_index_null(&row);
                }
            }
            else
            {
                char *element = PQgetvalue(pg_result, pg_row, i);
                if (element)
                {
                    const size_t element_len = strlen(element);
                    if (result_type & PGSQL_ASSOC)
                    {
                        add_assoc_stringl(&row, field_name, element, element_len);
                    }
                    if (result_type & PGSQL_NUM)
                    {
                        add_next_index_stringl(&row, element, element_len);
                    }
                }
            }
        }
        add_index_zval(ret_array, pg_row, &row);
    }
    return SUCCESS;
}

static PHP_METHOD(swoole_postgresql_coro, fetchAll)
{
    zval *result;
    PGresult *pgsql_result;
    pg_object *object;
    zend_long result_type = PGSQL_ASSOC;

    ZEND_PARSE_PARAMETERS_START(1,2)
        Z_PARAM_RESOURCE(result)
        Z_PARAM_OPTIONAL
        Z_PARAM_LONG(result_type)
    ZEND_PARSE_PARAMETERS_END();

    if ((object = (pg_object *)zend_fetch_resource(Z_RES_P(result), "PostgreSQL result", le_result)) == NULL)
    {
        RETURN_FALSE;
    }

    pgsql_result = object->result;
    array_init(return_value);
    if (swoole_pgsql_result2array(pgsql_result, return_value, result_type) == FAILURE)
    {
        zval_dtor(return_value);
        RETURN_FALSE;
    }
}

static PHP_METHOD(swoole_postgresql_coro,affectedRows)
{
    zval *result;
    PGresult *pgsql_result;
    pg_object *object;

    ZEND_PARSE_PARAMETERS_START(1,1)
        Z_PARAM_RESOURCE(result)
    ZEND_PARSE_PARAMETERS_END();

    if ((object = (pg_object *)zend_fetch_resource(Z_RES_P(result), "PostgreSQL result", le_result)) == NULL)
    {
        RETURN_FALSE;
    }

    pgsql_result = object->result;
    RETVAL_LONG(atoi(PQcmdTuples(pgsql_result)));
}


//query's num
static PHP_METHOD(swoole_postgresql_coro,numRows)
{
    zval *result;
    PGresult *pgsql_result;
    pg_object *object;

    ZEND_PARSE_PARAMETERS_START(1,1)
        Z_PARAM_RESOURCE(result)
    ZEND_PARSE_PARAMETERS_END();

    if ((object = (pg_object *)zend_fetch_resource(Z_RES_P(result), "PostgreSQL result", le_result)) == NULL)
    {
        RETURN_FALSE;
    }

    pgsql_result = object->result;
    RETVAL_LONG(PQntuples(pgsql_result));
}

static PHP_METHOD(swoole_postgresql_coro,metaData)
{

    char *table_name;
    size_t table_name_len;
    zend_bool extended=0;
    PGconn *pgsql;

    PGresult *pg_result;
    char *src, *tmp_name, *tmp_name2 = NULL;
    char *escaped;
    smart_str querystr = {0};
    size_t new_len;


    ZEND_PARSE_PARAMETERS_START(1,1)
        Z_PARAM_STRING(table_name, table_name_len)
    ZEND_PARSE_PARAMETERS_END();

    pg_object *pg_object = swoole_get_object(getThis());
    pg_object->request_type = META_DATA;
    pgsql = pg_object -> conn;


    while ((pg_result = PQgetResult(pgsql)))
    {
        PQclear(pg_result);
    }

    if (!*table_name)
    {
        php_error_docref(NULL, E_WARNING, "The table name must be specified");
        RETURN_FALSE;
    }

    src = estrdup(table_name);
    tmp_name = php_strtok_r(src, ".", &tmp_name2);
    if (!tmp_name)
    {
        efree(src);
        php_error_docref(NULL, E_WARNING, "The table name must be specified");
        RETURN_FALSE;
    }
    if (!tmp_name2 || !*tmp_name2)
    {
        /* Default schema */
        tmp_name2 = tmp_name;
        tmp_name = "public";
    }

    if (extended)
    {
        smart_str_appends(&querystr,
                          "SELECT a.attname, a.attnum, t.typname, a.attlen, a.attnotNULL, a.atthasdef, a.attndims, t.typtype, "
                                  "d.description "
                                  "FROM pg_class as c "
                                  " JOIN pg_attribute a ON (a.attrelid = c.oid) "
                                  " JOIN pg_type t ON (a.atttypid = t.oid) "
                                  " JOIN pg_namespace n ON (c.relnamespace = n.oid) "
                                  " LEFT JOIN pg_description d ON (d.objoid=a.attrelid AND d.objsubid=a.attnum AND c.oid=d.objoid) "
                                  "WHERE a.attnum > 0  AND c.relname = '");
    }
    else
    {
        smart_str_appends(&querystr,
                          "SELECT a.attname, a.attnum, t.typname, a.attlen, a.attnotnull, a.atthasdef, a.attndims, t.typtype "
                                  "FROM pg_class as c "
                                  " JOIN pg_attribute a ON (a.attrelid = c.oid) "
                                  " JOIN pg_type t ON (a.atttypid = t.oid) "
                                  " JOIN pg_namespace n ON (c.relnamespace = n.oid) "
                                  "WHERE a.attnum > 0 AND c.relname = '");
    }
    escaped = (char *)safe_emalloc(strlen(tmp_name2), 2, 1);
    new_len = PQescapeStringConn(pgsql, escaped, tmp_name2, strlen(tmp_name2), NULL);
    if (new_len)
    {
        smart_str_appendl(&querystr, escaped, new_len);
    }
    efree(escaped);

    smart_str_appends(&querystr, "' AND n.nspname = '");
    escaped = (char *)safe_emalloc(strlen(tmp_name), 2, 1);
    new_len = PQescapeStringConn(pgsql, escaped, tmp_name, strlen(tmp_name), NULL);
    if (new_len)
    {
        smart_str_appendl(&querystr, escaped, new_len);
    }
    efree(escaped);

    smart_str_appends(&querystr, "' ORDER BY a.attnum;");
    smart_str_0(&querystr);
    efree(src);

    //pg_result = PQexec(pgsql, ZSTR_VAL(querystr.s));


    int ret  = PQsendQuery(pgsql, ZSTR_VAL(querystr.s));
    if(ret == 0)
    {
        char * errMsg = PQerrorMessage(pgsql);
        swWarn("error:[%s]",errMsg);

    }
    smart_str_free(&querystr);

    php_context *sw_current_context = swoole_get_property(getThis(), 0);
    sw_current_context->state = SW_CORO_CONTEXT_RUNNING;
    sw_current_context->onTimeout = NULL;
#if PHP_MAJOR_VERSION < 7
        sw_current_context->coro_params = getThis();
#else
        sw_current_context->coro_params = *getThis();
#endif
        /*
            if (redis->timeout > 0)
            {
                php_swoole_check_timer((int) (redis->timeout * 1000));
                redis->timer = SwooleG.timer.add(&SwooleG.timer, (int) (redis->timeout * 1000), 0, sw_current_context, swoole_redis_coro_onTimeout);
            }*/
        coro_save(sw_current_context);
        coro_yield();

}

/* {{{ void php_pgsql_fetch_hash */
static void php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAMETERS, zend_long result_type, int into_object)
{
    zval               *result, *zrow = NULL;
    PGresult           *pgsql_result;
    pg_object          *pg_result;
    int                i, num_fields, pgsql_row, use_row;
    zend_long          row = -1;
    char               *field_name;
    zval               *ctor_params = NULL;
    zend_class_entry   *ce = NULL;

    if (into_object)
    {
        zend_string *class_name = NULL;

        if (zend_parse_parameters(ZEND_NUM_ARGS(), "r|z!Sz", &result, &zrow, &class_name, &ctor_params) == FAILURE)
        {
            return;
        }
        if (!class_name)
        {
            ce = zend_standard_class_def;
        } else {
            ce = zend_fetch_class(class_name, ZEND_FETCH_CLASS_AUTO);
        }
        if (!ce)
        {
            php_error_docref(NULL, E_WARNING, "Could not find class '%s'", ZSTR_VAL(class_name));
            return;
        }
        result_type = PGSQL_ASSOC;
    }
    else
    {
        if (zend_parse_parameters(ZEND_NUM_ARGS(), "r|z!l", &result, &zrow, &result_type) == FAILURE)
        {
            return;
        }
    }
    if (zrow == NULL)
    {
        row = -1;
    } else {
        convert_to_long(zrow);
        row = Z_LVAL_P(zrow);
        if (row < 0) {
            php_error_docref(NULL, E_WARNING, "The row parameter must be greater or equal to zero");
            RETURN_FALSE;
        }
    }
    use_row = ZEND_NUM_ARGS() > 1 && row != -1;

    if (!(result_type & PGSQL_BOTH))
    {
        php_error_docref(NULL, E_WARNING, "Invalid result type");
        RETURN_FALSE;
    }

    if ((pg_result = (pg_object *)zend_fetch_resource(Z_RES_P(result), "PostgreSQL result", le_result)) == NULL)
    {
        RETURN_FALSE;
    }

    pgsql_result = pg_result->result;

    if (use_row)
    {
        if (row < 0 || row >= PQntuples(pgsql_result))
        {
            php_error_docref(NULL, E_WARNING, "Unable to jump to row " ZEND_LONG_FMT " on PostgreSQL result index " ZEND_LONG_FMT,
                    row, Z_LVAL_P(result));
            RETURN_FALSE;
        }
        pgsql_row = (int)row;
        pg_result->row = pgsql_row;
    }
    else
    {
        /* If 2nd param is NULL, use internal row counter to access next row */
        pgsql_row = pg_result->row;
        if (pgsql_row < 0 || pgsql_row >= PQntuples(pgsql_result)) {
            RETURN_FALSE;
        }
        pg_result->row++;
    }

    array_init(return_value);
    for (i = 0, num_fields = PQnfields(pgsql_result); i < num_fields; i++)
    {
        if (PQgetisnull(pgsql_result, pgsql_row, i)) {
            if (result_type & PGSQL_NUM)
            {
                add_index_null(return_value, i);
            }
            if (result_type & PGSQL_ASSOC)
            {
                field_name = PQfname(pgsql_result, i);
                add_assoc_null(return_value, field_name);
            }
        }
        else
        {
            char *element = PQgetvalue(pgsql_result, pgsql_row, i);
            if (element)
            {
                const size_t element_len = strlen(element);

                if (result_type & PGSQL_NUM)
                {
                    add_index_stringl(return_value, i, element, element_len);
                }

                if (result_type & PGSQL_ASSOC)
                {
                    field_name = PQfname(pgsql_result, i);
                    add_assoc_stringl(return_value, field_name, element, element_len);
                }
            }
        }
    }

    if (into_object)
    {
        zval dataset;
        zend_fcall_info fci;
        zend_fcall_info_cache fcc;
        zval retval;

        ZVAL_COPY_VALUE(&dataset, return_value);
        object_and_properties_init(return_value, ce, NULL);
        if (!ce->default_properties_count && !ce->__set)
        {
            Z_OBJ_P(return_value)->properties = Z_ARR(dataset);
        }
        else
        {
            zend_merge_properties(return_value, Z_ARRVAL(dataset));
            zval_ptr_dtor(&dataset);
        }

        if (ce->constructor)
        {
            fci.size = sizeof(fci);
            ZVAL_UNDEF(&fci.function_name);
            fci.object = Z_OBJ_P(return_value);
            fci.retval = &retval;
            fci.params = NULL;
            fci.param_count = 0;
            fci.no_separation = 1;

            if (ctor_params && Z_TYPE_P(ctor_params) != IS_NULL)
            {
                if (zend_fcall_info_args(&fci, ctor_params) == FAILURE) {
                    /* Two problems why we throw exceptions here: PHP is typeless
                     * and hence passing one argument that's not an array could be
                     * by mistake and the other way round is possible, too. The
                     * single value is an array. Also we'd have to make that one
                     * argument passed by reference.
                     */
                    zend_throw_exception(zend_ce_exception, "Parameter ctor_params must be an array", 0);
                    return;
                }
            }

            fcc.initialized = 1;
            fcc.function_handler = ce->constructor;
#if PHP_MINOR_VERSION > 0
            fcc.calling_scope = zend_get_executed_scope();
#else
            fcc.calling_scope = EG(scope);
#endif
            fcc.called_scope = Z_OBJCE_P(return_value);
            fcc.object = Z_OBJ_P(return_value);

            if (zend_call_function(&fci, &fcc) == FAILURE)
            {
                zend_throw_exception_ex(zend_ce_exception, 0, "Could not execute %s::%s()", ZSTR_VAL(ce->name), ZSTR_VAL(ce->constructor->common.function_name));
            } else
            {
                zval_ptr_dtor(&retval);
            }
            if (fci.params)
            {
                efree(fci.params);
            }
        }
        else if (ctor_params)
        {
            zend_throw_exception_ex(zend_ce_exception, 0, "Class %s does not have a constructor hence you cannot use ctor_params", ZSTR_VAL(ce->name));
        }
    }
}
/* }}} */

/* {{{ proto array fetchRow(resource result [, int row [, int result_type]])
   Get a row as an enumerated array */
static PHP_METHOD(swoole_postgresql_coro,fetchRow)
{
    php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAM_PASSTHRU, PGSQL_NUM, 0);
}
/* }}} */

/* {{{ proto array fetchAssoc(resource result [, int row])
   Fetch a row as an assoc array */
static PHP_METHOD(swoole_postgresql_coro,fetchAssoc)
{
    /* pg_fetch_assoc() is added from PHP 4.3.0. It should raise error, when
       there is 3rd parameter */
    if (ZEND_NUM_ARGS() > 2)
        WRONG_PARAM_COUNT;
    php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAM_PASSTHRU, PGSQL_ASSOC, 0);
}
/* }}} */

/* {{{ proto array fetchArray(resource result [, int row [, int result_type]])
   Fetch a row as an array */
static PHP_METHOD(swoole_postgresql_coro,fetchArray)
{
    php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAM_PASSTHRU, PGSQL_BOTH, 0);
}
/* }}} */

/* {{{ proto object fetchObject(resource result [, int row [, string class_name [, NULL|array ctor_params]]])
   Fetch a row as an object */
static PHP_METHOD(swoole_postgresql_coro,fetchObject)
{
    /* fetchObject() allowed result_type used to be. 3rd parameter
       must be allowed for compatibility */
    php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAM_PASSTHRU, PGSQL_ASSOC, 1);
}

/* {{{ _destroy_pgsql_link
 */
static void _destroy_pgsql_link(zend_resource *rsrc)
{
    PGconn *link = (PGconn *)rsrc->ptr;
    PGresult *res;

    while ((res = PQgetResult(link)))
    {
        PQclear(res);
    }
    PQfinish(link);
}

static void _free_result(zend_resource *rsrc)
{
    pg_object *pg_result = (pg_object *)rsrc->ptr;

    efree(pg_result);
}

static int swoole_pgsql_coro_onError(swReactor *reactor, swEvent *event)
{
#if PHP_MAJOR_VERSION < 7
    TSRMLS_FETCH_FROM_CTX(sw_thread_ctx ? sw_thread_ctx : NULL);
#endif

    pg_object *pg_object = (event->socket->object);
    zval *retval = NULL, *result;
    zval *zobject  = pg_object->object;

    swoole_postgresql_coro_close(pg_object);

    SW_ALLOC_INIT_ZVAL(result);
    ZVAL_BOOL(result, 0);

    php_context *sw_current_context = swoole_get_property(zobject, 0);
    int ret = coro_resume(sw_current_context, result, &retval);
    sw_zval_free(result);

    if (ret == CORO_END && retval)
    {
        sw_zval_ptr_dtor(&retval);
    }

    return SW_OK;
}

static PHP_METHOD(swoole_postgresql_coro, __destruct)
{
    pg_object *pg_object = swoole_get_object(getThis());
    swoole_postgresql_coro_close(pg_object);

}

static int swoole_postgresql_coro_close(pg_object *pg_object)
{
    if (!pg_object)
    {
        swoole_php_fatal_error(E_WARNING, "object is not instanceof swoole_postgresql_coro.");
        return FAILURE;
    }
    SwooleG.main_reactor->del(SwooleG.main_reactor, pg_object->fd);

    swConnection *_socket = swReactor_get(SwooleG.main_reactor, pg_object->fd);
    _socket->object = NULL;
    _socket->active = 0;
    efree(pg_object);
    if(pg_object->object)
    {
        php_context *sw_current_context = swoole_get_property(pg_object->object, 0);
        efree(sw_current_context);
    }

    return SUCCESS;
}
#endif
#endif