Skip to content

Commit

Permalink
Common code for php 5 and 7
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaud-lb committed Dec 18, 2016
1 parent c33871b commit 319967c
Show file tree
Hide file tree
Showing 25 changed files with 615 additions and 280 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ php:
- 5.4
- 5.5
- 5.6
- 7.0
- 7.1
env:
- LIBRDKAFKA_VERSION=0.8.6
- LIBRDKAFKA_VERSION=0.8
Expand Down
2 changes: 1 addition & 1 deletion CREDITS
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
rdkafka
Arnaud Le Blanc [lbarnaud] <[email protected]> (lead)
Arnaud Le Blanc
198 changes: 86 additions & 112 deletions conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,42 +31,51 @@
#include "conf.h"
#include "topic_partition.h"
#include "message.h"
#include "zeval.h"

zend_class_entry * ce_kafka_conf;
zend_class_entry * ce_kafka_topic_conf;

static zend_object_handlers handlers;

static void kafka_conf_callback_dtor(kafka_conf_callback *cb TSRMLS_DC) /* {{{ */
{
if (cb->fci.function_name) {
if (cb) {
zval_ptr_dtor(&cb->fci.function_name);
efree(cb);
}
} /* }}} */

void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs TSRMLS_DC) /* {{{ */
{
kafka_conf_callback_dtor(&cbs->error TSRMLS_CC);
kafka_conf_callback_dtor(&cbs->rebalance TSRMLS_CC);
kafka_conf_callback_dtor(&cbs->dr_msg TSRMLS_CC);
kafka_conf_callback_dtor(cbs->error TSRMLS_CC);
kafka_conf_callback_dtor(cbs->rebalance TSRMLS_CC);
kafka_conf_callback_dtor(cbs->dr_msg TSRMLS_CC);
} /* }}} */

static void kafka_conf_callback_copy(kafka_conf_callback *to, kafka_conf_callback *from TSRMLS_DC) /* {{{ */
static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from TSRMLS_DC) /* {{{ */
{
*to = *from;
if (to->fci.function_name) {
Z_ADDREF_P(to->fci.function_name);
if (from) {
*to = emalloc(sizeof(**to));
**to = *from;
#if PHP_MAJOR_VERSION >= 7
zval_copy_ctor(&(*to)->fci.function_name);
#else
Z_ADDREF_P((*to)->fci.function_name);
#endif
}
} /* }}} */

void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from TSRMLS_DC) /* {{{ */
{
kafka_conf_callback_copy(&to->error, &from->error TSRMLS_CC);
kafka_conf_callback_copy(&to->rebalance, &from->rebalance TSRMLS_CC);
kafka_conf_callback_copy(&to->dr_msg, &from->dr_msg TSRMLS_CC);
kafka_conf_callback_copy(&to->error, from->error TSRMLS_CC);
kafka_conf_callback_copy(&to->rebalance, from->rebalance TSRMLS_CC);
kafka_conf_callback_copy(&to->dr_msg, from->dr_msg TSRMLS_CC);
} /* }}} */

static void kafka_conf_free(void *object TSRMLS_DC) /* {{{ */
static void kafka_conf_free(zend_object *object TSRMLS_DC) /* {{{ */
{
kafka_conf_object *intern = (kafka_conf_object*)object;
kafka_conf_object *intern = get_custom_object(kafka_conf_object, object);

switch (intern->type) {
case KAFKA_CONF:
Expand All @@ -84,7 +93,7 @@ static void kafka_conf_free(void *object TSRMLS_DC) /* {{{ */

zend_object_std_dtor(&intern->std TSRMLS_CC);

efree(intern);
free_custom_object(intern);
}
/* }}} */

Expand All @@ -97,8 +106,8 @@ static zend_object_value kafka_conf_new(zend_class_entry *class_type TSRMLS_DC)
zend_object_std_init(&intern->std, class_type TSRMLS_CC);
object_properties_init(&intern->std, class_type);

retval.handle = zend_objects_store_put(&intern->std, (zend_objects_store_dtor_t) zend_objects_destroy_object, kafka_conf_free, NULL TSRMLS_CC);
retval.handlers = &kafka_object_handlers;
STORE_OBJECT(retval, intern, (zend_objects_store_dtor_t) zend_objects_destroy_object, kafka_conf_free, NULL);
SET_OBJECT_HANDLERS(retval, &handlers);

return retval;
}
Expand All @@ -119,131 +128,86 @@ kafka_conf_object * get_kafka_conf_object(zval *zconf TSRMLS_DC)
static void kafka_conf_error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zval *retval;
zval **args[3];
zval *zrk;
zval *zerr;
zval *zreason;
zeval args[3];
TSRMLS_FETCH();

if (!opaque) {
return;
}

if (!cbs->error.fci.function_name) {
if (!cbs->error) {
return;
}

ALLOC_INIT_ZVAL(zrk);
KAFKA_ZVAL_ZVAL(zrk, &cbs->rk, 1, 0);

ALLOC_INIT_ZVAL(zerr);
ZVAL_LONG(zerr, err);

ALLOC_INIT_ZVAL(zreason);
ZVAL_STRING(zreason, reason, 1);
MAKE_STD_ZEVAL(args[0]);
MAKE_STD_ZEVAL(args[1]);
MAKE_STD_ZEVAL(args[2]);

args[0] = &zrk;
args[1] = &zerr;
args[2] = &zreason;
KAFKA_ZVAL_ZVAL(P_ZEVAL(args[0]), &cbs->rk, 1, 0);
ZVAL_LONG(P_ZEVAL(args[1]), err);
RDKAFKA_ZVAL_STRING(P_ZEVAL(args[2]), reason);

cbs->error.fci.retval_ptr_ptr = &retval;
cbs->error.fci.params = args;
cbs->error.fci.param_count = 3;
rdkafka_call_function(&cbs->error->fci, &cbs->error->fcc, NULL, 3, args TSRMLS_CC);

zend_call_function(&cbs->error.fci, &cbs->error.fcc TSRMLS_CC);

if (retval) {
zval_ptr_dtor(&retval);
}
zval_ptr_dtor(&zrk);
zval_ptr_dtor(&zerr);
zval_ptr_dtor(&zreason);
zval_ptr_dtor(&args[0]);
zval_ptr_dtor(&args[1]);
zval_ptr_dtor(&args[2]);
}

static void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zval *retval;
zval **args[2];
zval *zrk;
zval *zrkmsg;
zeval args[2];
TSRMLS_FETCH();

if (!opaque) {
return;
}

if (!cbs->dr_msg.fci.function_name) {
if (!cbs->dr_msg) {
return;
}

ALLOC_INIT_ZVAL(zrk);
KAFKA_ZVAL_ZVAL(zrk, &cbs->rk, 1, 0);

ALLOC_INIT_ZVAL(zrkmsg);
kafka_message_new(zrkmsg, msg TSRMLS_CC);
MAKE_STD_ZEVAL(args[0]);
MAKE_STD_ZEVAL(args[1]);

args[0] = &zrk;
args[1] = &zrkmsg;
KAFKA_ZVAL_ZVAL(P_ZEVAL(args[0]), &cbs->rk, 1, 0);
kafka_message_new(P_ZEVAL(args[1]), msg TSRMLS_CC);

cbs->dr_msg.fci.retval_ptr_ptr = &retval;
cbs->dr_msg.fci.params = args;
cbs->dr_msg.fci.param_count = 2;
rdkafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 2, args TSRMLS_CC);

zend_call_function(&cbs->dr_msg.fci, &cbs->dr_msg.fcc TSRMLS_CC);

if (retval) {
zval_ptr_dtor(&retval);
}
zval_ptr_dtor(&zrk);
zval_ptr_dtor(&zrkmsg);
zval_ptr_dtor(&args[0]);
zval_ptr_dtor(&args[1]);
}

#ifdef HAVE_NEW_KAFKA_CONSUMER
static void kafka_conf_rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zval *retval;
zval **args[3];
zval *zrk;
zval *zerr;
zval *zpartitions;
zeval args[3];
TSRMLS_FETCH();

if (!opaque) {
return;
}

if (!cbs->rebalance.fci.function_name) {
if (!cbs->rebalance) {
return;
}

ALLOC_INIT_ZVAL(zrk);
KAFKA_ZVAL_ZVAL(zrk, &cbs->rk, 1, 0);

ALLOC_INIT_ZVAL(zerr);
ZVAL_LONG(zerr, err);

ALLOC_INIT_ZVAL(zpartitions);
kafka_topic_partition_list_to_array(zpartitions, partitions TSRMLS_CC);
MAKE_STD_ZEVAL(args[0]);
MAKE_STD_ZEVAL(args[1]);
MAKE_STD_ZEVAL(args[2]);

args[0] = &zrk;
args[1] = &zerr;
args[2] = &zpartitions;
KAFKA_ZVAL_ZVAL(P_ZEVAL(args[0]), &cbs->rk, 1, 0);
ZVAL_LONG(P_ZEVAL(args[1]), err);
kafka_topic_partition_list_to_array(P_ZEVAL(args[2]), partitions TSRMLS_CC);

cbs->rebalance.fci.retval_ptr_ptr = &retval;
cbs->rebalance.fci.params = args;
cbs->rebalance.fci.param_count = 3;
rdkafka_call_function(&cbs->rebalance->fci, &cbs->rebalance->fcc, NULL, 3, args TSRMLS_CC);

zend_call_function(&cbs->rebalance.fci, &cbs->rebalance.fcc TSRMLS_CC);

if (retval) {
zval_ptr_dtor(&retval);
}
zval_ptr_dtor(&zrk);
zval_ptr_dtor(&zerr);
zval_ptr_dtor(&zpartitions);
zval_ptr_dtor(&args[0]);
zval_ptr_dtor(&args[1]);
zval_ptr_dtor(&args[2]);
}
#endif /* HAVE_NEW_KAFKA_CONSUMER */

Expand Down Expand Up @@ -310,7 +274,7 @@ PHP_METHOD(RdKafka__Conf, dump)
for (i = 0; i < cntp; i+=2) {
const char *key = dump[i];
const char *value = dump[i+1];
add_assoc_string(return_value, (char*)key, (char*)value, 1);
rdkafka_add_assoc_string(return_value, (char*)key, (char*)value);
}

rd_kafka_conf_dump_free(dump, cntp);
Expand All @@ -328,9 +292,9 @@ ZEND_END_ARG_INFO()
PHP_METHOD(RdKafka__Conf, set)
{
char *name;
int name_len;
arglen_t name_len;
char *value;
int value_len;
arglen_t value_len;
kafka_conf_object *intern;
rd_kafka_conf_res_t ret = 0;
char errstr[512];
Expand Down Expand Up @@ -425,14 +389,16 @@ PHP_METHOD(RdKafka__Conf, setErrorCb)
return;
}

Z_ADDREF_P(fci.function_name);
Z_ADDREF_P(P_ZEVAL(fci.function_name));

if (intern->cbs.error.fci.function_name) {
zval_ptr_dtor(&intern->cbs.error.fci.function_name);
if (intern->cbs.error) {
zval_ptr_dtor(&intern->cbs.error->fci.function_name);
} else {
intern->cbs.error = ecalloc(1, sizeof(*intern->cbs.error));
}

intern->cbs.error.fci = fci;
intern->cbs.error.fcc = fcc;
intern->cbs.error->fci = fci;
intern->cbs.error->fcc = fcc;

rd_kafka_conf_set_error_cb(intern->u.conf, kafka_conf_error_cb);
}
Expand Down Expand Up @@ -460,14 +426,16 @@ PHP_METHOD(RdKafka__Conf, setDrMsgCb)
return;
}

Z_ADDREF_P(fci.function_name);
Z_ADDREF_P(P_ZEVAL(fci.function_name));

if (intern->cbs.dr_msg.fci.function_name) {
zval_ptr_dtor(&intern->cbs.dr_msg.fci.function_name);
if (intern->cbs.dr_msg) {
zval_ptr_dtor(&intern->cbs.dr_msg->fci.function_name);
} else {
intern->cbs.dr_msg = ecalloc(1, sizeof(*intern->cbs.dr_msg));
}

intern->cbs.dr_msg.fci = fci;
intern->cbs.dr_msg.fcc = fcc;
intern->cbs.dr_msg->fci = fci;
intern->cbs.dr_msg->fcc = fcc;

rd_kafka_conf_set_dr_msg_cb(intern->u.conf, kafka_conf_dr_msg_cb);
}
Expand Down Expand Up @@ -496,14 +464,16 @@ PHP_METHOD(RdKafka__Conf, setRebalanceCb)
return;
}

Z_ADDREF_P(fci.function_name);
Z_ADDREF_P(P_ZEVAL(fci.function_name));

if (intern->cbs.rebalance.fci.function_name) {
zval_ptr_dtor(&intern->cbs.rebalance.fci.function_name);
if (intern->cbs.rebalance) {
zval_ptr_dtor(&intern->cbs.rebalance->fci.function_name);
} else {
intern->cbs.rebalance = ecalloc(1, sizeof(*intern->cbs.rebalance));
}

intern->cbs.rebalance.fci = fci;
intern->cbs.rebalance.fcc = fcc;
intern->cbs.rebalance->fci = fci;
intern->cbs.rebalance->fcc = fcc;

rd_kafka_conf_set_rebalance_cb(intern->u.conf, kafka_conf_rebalance_cb);
}
Expand Down Expand Up @@ -597,6 +567,10 @@ void kafka_conf_minit(TSRMLS_D)
{
zend_class_entry tmpce;

handlers = kafka_default_object_handlers;
set_object_handler_free_obj(&handlers, kafka_conf_free);
set_object_handler_offset(&handlers, XtOffsetOf(kafka_conf_object, std));

INIT_NS_CLASS_ENTRY(tmpce, "RdKafka", "Conf", kafka_conf_fe);
ce_kafka_conf = zend_register_internal_class(&tmpce TSRMLS_CC);
ce_kafka_conf->create_object = kafka_conf_new;
Expand Down
13 changes: 9 additions & 4 deletions conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,24 @@ typedef struct _kafka_conf_callback {

typedef struct _kafka_conf_callbacks {
zval rk;
kafka_conf_callback error;
kafka_conf_callback rebalance;
kafka_conf_callback dr_msg;
kafka_conf_callback *error;
kafka_conf_callback *rebalance;
kafka_conf_callback *dr_msg;
} kafka_conf_callbacks;

typedef struct _kafka_conf_object {
zend_object std;
#if PHP_MAJOR_VERSION < 7
zend_object std;
#endif
kafka_conf_type type;
union {
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
} u;
kafka_conf_callbacks cbs;
#if PHP_MAJOR_VERSION >= 7
zend_object std;
#endif
} kafka_conf_object;

kafka_conf_object * get_kafka_conf_object(zval *zconf TSRMLS_DC);
Expand Down
Loading

0 comments on commit 319967c

Please sign in to comment.