/* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "dworker.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include typedef struct dw_conf_t { const char *modules_path; const char *addr; int port; int threads; int verbose; } dw_conf_t; /* signle worker thread */ typedef struct dw_worker_t dw_worker_t; struct dw_worker_t { apr_thread_t *m_thread; apr_pool_t *m_pool; dw_ctxt_t *m_ctxt; dw_worker_t *next; }; typedef struct dw_module_list_t dw_module_list_t; struct dw_module_list_t { dw_module_t *m_module; dw_module_list_t *next; }; struct dw_ctxt_t { apr_pool_t *m_pool; dw_conf_t m_conf; apr_hash_t *m_plugins; apr_file_t *errfile; apr_file_t *outfile; /* listening socket */ apr_socket_t *m_lskt; /* only a single thread controlls the listening socket at a time. */ apr_thread_mutex_t *m_lmutex; /* linked list of worker threads. */ dw_worker_t *m_threads; /* linked list of all loaded module handles */ dw_module_list_t *m_modules; /* 0=stop, 1=continue */ int m_keep_running; }; typedef struct dw_stats_t { apr_uint64_t requests; apr_uint64_t bytes; } dw_stats_t; typedef struct dw_plugin_t { dw_stats_t m_stats; dw_module_t *m_module; const char* m_name; dw_handler_cb m_cb; void *m_baton; } dw_plugin_t; struct dw_req_t { apr_socket_t *m_skt; apr_bucket_alloc_t *m_bballoc; apr_bucket_brigade *m_bbin; apr_bucket_brigade *m_bbtmp; }; #ifndef AP_IOBUFSIZE #define AP_IOBUFSIZE 8192 #endif #define DW_EOL "\r\n" #define DW_EOL_LEN (sizeof(DW_EOL)-1) #define DW_WS " " #define DW_WS_LEN (sizeof(DW_WS)-1) #define DW_INVOKE "invoke " #define DW_INVOKE_LEN (sizeof(DW_INVOKE)-1) #define DW_STATS "stats" #define DW_STATS_LEN (sizeof(DW_STATS)-1) #define DW_MAX_PARAM_LEN 524288 /* 512 kbytes */ static apr_status_t dw_write_error(dw_ctxt_t *ctxt, dw_req_t *req, const char* message) { apr_file_printf(ctxt->errfile, "Client Error: %s\n", message); /* XXXX: Send ERR: to client. */ return APR_SUCCESS; } apr_status_t dw_req_send_result(dw_req_t *req, const char *data, apr_size_t len) { apr_status_t rv; char tbuf[128]; apr_size_t buflen; buflen = apr_snprintf(tbuf, sizeof(tbuf), "RESULT %" APR_SIZE_T_FMT "\r\n", len); rv = apr_socket_send(req->m_skt, tbuf, &buflen); if (rv) { return rv; } buflen = len; rv = apr_socket_send(req->m_skt, data, &buflen); if (rv) { return rv; } buflen = apr_snprintf(tbuf, sizeof(tbuf), "\r\nEND\r\n"); rv = apr_socket_send(req->m_skt, tbuf, &buflen); if (rv) { return rv; } return rv; } static apr_status_t dw_handle_command_param(dw_ctxt_t *ctxt, dw_req_t *req, int len, char **obuf, apr_size_t *oblen, apr_pool_t *pool) { apr_size_t blen = 0; char *buf = NULL; apr_status_t rv = APR_SUCCESS; if (ctxt->m_conf.verbose) { apr_file_printf(ctxt->errfile, "command param len: %d\n", len); } if (len == 0) { /* XXXX: no input to read */ } else if (len < 0) { /* < -1 is a shortcut to search for the next newline */ do { rv = apr_brigade_split_line(req->m_bbtmp, req->m_bbin, APR_BLOCK_READ, AP_IOBUFSIZE); } while (APR_STATUS_IS_EAGAIN(rv)); if (rv) { apr_file_printf(ctxt->errfile, "Error: Reading line from client: (%d) %pm\n", rv, &rv); return rv; } rv = apr_brigade_pflatten(req->m_bbtmp, &buf, &blen, pool); apr_brigade_cleanup(req->m_bbtmp); if (rv) { apr_file_printf(ctxt->errfile, "Error: Reading pflatten from client: (%d) %pm\n", rv, &rv); return rv; } } else if (len > DW_MAX_PARAM_LEN) { return dw_write_error(ctxt, req, "requested parameter size is too large."); } else { apr_bucket *f = NULL; apr_bucket *e = NULL; do { rv = apr_brigade_partition(req->m_bbin, len, &e); } while (APR_STATUS_IS_EAGAIN(rv)); if (rv) { apr_file_printf(ctxt->errfile, "Error: Reading line from client: (%d) %pm\n", rv, &rv); return rv; } if (!e) { apr_file_printf(ctxt->errfile, "Error: No bucket reading line from client: (%d) %pm\n", rv, &rv); return rv; } /* XXXX: apr_brigade_split sucks, makes you allocate a new brigade. */ if (e != APR_BRIGADE_SENTINEL(req->m_bbin)) { f = APR_RING_LAST(&req->m_bbin->list); APR_RING_UNSPLICE(e, f, link); APR_RING_SPLICE_HEAD(&req->m_bbtmp->list, e, f, apr_bucket, link); } else { apr_brigade_cleanup(req->m_bbtmp); } rv = apr_brigade_pflatten(req->m_bbtmp, &buf, &blen, pool); apr_brigade_cleanup(req->m_bbtmp); if (rv) { apr_file_printf(ctxt->errfile, "Error: Reading pflatten from client: (%d) %pm\n", rv, &rv); return rv; } } *obuf = buf; *oblen = blen; return rv; } static apr_status_t dw_handle_invoke(dw_ctxt_t *ctxt, dw_req_t *req, char* buf, apr_size_t blen, apr_pool_t *pool) { int ret = 0; apr_status_t rv = APR_SUCCESS; char *cmdbuf = NULL; apr_size_t cmdlen = 0; char *p = buf; const char *command = NULL; int plen = 0; char *pws; p += DW_INVOKE_LEN; pws = strchr(p, ' '); if (pws) { command = apr_pstrndup(pool, p, (pws - p)); p += (pws - p); pws = strchr(p, ' '); if (pws) { plen = atoi(pws); rv = dw_handle_command_param(ctxt, req, plen, &cmdbuf, &cmdlen, pool); if (rv) { return rv; } { dw_plugin_t *plugin = apr_hash_get(ctxt->m_plugins, command, APR_HASH_KEY_STRING); if (!plugin) { apr_file_printf(ctxt->errfile, "Error: client command: %s\n", command); return dw_write_error(ctxt, req, "no such task to invoke"); } ret = plugin->m_cb(req, cmdbuf, cmdlen, plugin->m_baton); } } else { return dw_write_error(ctxt, req, "invalid invoke parameters"); } } else { return dw_write_error(ctxt, req, "invalid invoke parameters"); } return rv; } static apr_status_t dw_handle_line(dw_ctxt_t *ctxt, dw_req_t *req, char* buf, apr_size_t blen, apr_pool_t *pool) { char *p = buf; if (strncmp(DW_INVOKE, p, DW_INVOKE_LEN) == 0) { return dw_handle_invoke(ctxt, req, buf, blen, pool); } else { return dw_write_error(ctxt, req, "invalid command"); } return APR_SUCCESS; } static apr_status_t dw_handle_client(dw_ctxt_t *ctxt, dw_worker_t *me, dw_req_t *req, apr_pool_t *conn_pool) { apr_status_t rv = APR_SUCCESS; apr_pool_t *tpool = NULL; apr_pool_create(&tpool, conn_pool); do { char buf[AP_IOBUFSIZE]; apr_size_t blen = sizeof(buf) - 1; apr_pool_clear(tpool); do { rv = apr_brigade_split_line(req->m_bbtmp, req->m_bbin, APR_BLOCK_READ, blen); } while (APR_STATUS_IS_EAGAIN(rv)); if (rv) { apr_file_printf(ctxt->errfile, "Error: Reading line from client: (%d) %pm\n", rv, &rv); break; } rv = apr_brigade_flatten(req->m_bbtmp, buf, &blen); apr_brigade_cleanup(req->m_bbtmp); if (blen > 0) { buf[blen] = '\0'; dw_handle_line(ctxt, req, buf, blen, tpool); } else { rv = APR_EGENERAL; apr_file_printf(ctxt->errfile, "Error: Reading line from client: (%d) %pm\n", rv, &rv); break; } } while (rv == APR_SUCCESS); apr_pool_destroy(tpool); return rv; } static apr_status_t dw_get_client(dw_ctxt_t *ctxt, dw_req_t **p_req, apr_pool_t *p) { apr_status_t rv; apr_socket_t *skt; dw_req_t *req; apr_bucket* bsock; apr_thread_mutex_lock(ctxt->m_lmutex); rv = apr_socket_accept(&skt, ctxt->m_lskt, p); if (rv) { apr_thread_mutex_unlock(ctxt->m_lmutex); apr_file_printf(ctxt->errfile, "Error: Socket Accept Failed: (%d) %pm\n", rv, &rv); return rv; } apr_thread_mutex_unlock(ctxt->m_lmutex); req = apr_palloc(p, sizeof(dw_req_t)); req->m_skt = skt; req->m_bballoc = apr_bucket_alloc_create(p); req->m_bbin = apr_brigade_create(p, req->m_bballoc); req->m_bbtmp = apr_brigade_create(p, req->m_bballoc); bsock = apr_bucket_socket_create(req->m_skt, req->m_bballoc); APR_BRIGADE_INSERT_TAIL(req->m_bbin, bsock); *p_req = req; return APR_SUCCESS; } static void* dw_worker_thread(apr_thread_t *p_thread, void *p_baton) { apr_status_t rv = APR_SUCCESS; dw_worker_t *me = p_baton; dw_ctxt_t *ctxt = me->m_ctxt; me->m_pool = apr_thread_pool_get(me->m_thread); apr_pool_t *tpool = NULL; apr_pool_create(&tpool, me->m_pool); while (ctxt->m_keep_running == 1) { dw_req_t *req = NULL; rv = dw_get_client(ctxt, &req, tpool); if (rv) { apr_pool_clear(tpool); continue; } if (ctxt->m_conf.verbose) { apr_file_printf(ctxt->outfile, "connection open\n"); } rv = dw_handle_client(ctxt, me, req, tpool); if (ctxt->m_conf.verbose) { apr_file_printf(ctxt->outfile, "connection close\n"); } if (rv) { apr_file_printf(ctxt->errfile, "Error: handle client returned: (%d) %pm\n", rv, &rv); apr_pool_clear(tpool); continue; } apr_pool_clear(tpool); } if (ctxt->m_conf.verbose) { apr_file_printf(ctxt->outfile, "Thread exiting.\n"); } return NULL; } void dw_register_handler(dw_ctxt_t *ctxt, dw_module_t *pmod, const char *cmd, dw_handler_cb func_cp, void *baton) { dw_plugin_t *plugin = apr_palloc(ctxt->m_pool, sizeof(dw_plugin_t)); plugin->m_stats.requests = 0; plugin->m_stats.bytes = 0; plugin->m_module = pmod; plugin->m_name = apr_pstrdup(ctxt->m_pool, cmd); plugin->m_cb = func_cp; plugin->m_baton = baton; if (ctxt->m_conf.verbose) { apr_file_printf(ctxt->outfile, "\tRegistered Handler: %s\n", plugin->m_name); } apr_hash_set(ctxt->m_plugins, plugin->m_name, APR_HASH_KEY_STRING, plugin); } static void dw_add_module(dw_ctxt_t* ctxt, dw_module_list_t *ml) { dw_module_list_t *last = NULL; dw_module_list_t *cur = ctxt->m_modules; ml->next = NULL; if (!cur) { ctxt->m_modules = ml; return; } while (cur) { last = cur; cur = cur->next; } last->next = ml; } static apr_status_t dw_load_module(dw_ctxt_t* ctxt, const char* file) { apr_dso_handle_t *handle; apr_dso_handle_sym_t symbol; apr_status_t rv; char *module_name, *t; dw_module_t *module; dw_module_list_t *ml; rv = apr_dso_load(&handle, file, ctxt->m_pool); if (rv) { return rv; } /* figure out the module name */ module_name = apr_pstrdup(ctxt->m_pool, basename(file)); if ((t = strrchr(module_name, '.'))) { *t = '\0'; } /* load the module info */ t = apr_psprintf(ctxt->m_pool, "%s_module", module_name); rv = apr_dso_sym(&symbol, handle, t); if (rv) { return rv; } module = (dw_module_t*) symbol; if (module->version != DW_MODULE_VERSION) { /* magic byte mismatch, bail bail bail!*/ apr_file_printf(ctxt->errfile, "Error: '%s' contained an invalid version number. Found: %d Expected: %d\n", file, module->version, DW_MODULE_VERSION); return APR_EGENERAL; } module->dso_handle = handle; ml = apr_palloc(ctxt->m_pool, sizeof(dw_module_list_t)); dw_add_module(ctxt, ml); module->register_func(ctxt, module); return APR_SUCCESS; } /* load user generated tasklets. */ static apr_status_t dw_load_modules(dw_ctxt_t* ctxt) { int i; apr_status_t rv = APR_SUCCESS; apr_pool_t *tpool; apr_array_header_t *results; const char *pattern; char **entry; apr_pool_create(&tpool, ctxt->m_pool); rv = apr_filepath_set(ctxt->m_conf.modules_path, ctxt->m_pool); if (rv) { apr_file_printf(ctxt->errfile, "Error: Unable to chdir into '%s': (%d) %pm\n", ctxt->m_conf.modules_path, rv, &rv); apr_pool_destroy(tpool); return rv; } pattern = apr_pstrcat(tpool, ctxt->m_conf.modules_path, "/dw_*.so", NULL); rv = apr_match_glob(pattern, &results, tpool); if (rv) { apr_file_printf(ctxt->errfile, "Error: Glob Pattern Failed, %s: (%d) %pm\n", pattern, rv, &rv); apr_pool_destroy(tpool); return rv; } if (results->nelts < 1) { apr_file_printf(ctxt->errfile, "Error: No Modules found at %s\n", pattern); apr_pool_destroy(tpool); return APR_ENOSTAT; } entry = (char **)results->elts; for (i = 0; i < results->nelts; i++) { char *file = entry[i]; if (ctxt->m_conf.verbose) { apr_file_printf(ctxt->outfile, "Loading Module: %s\n", file); } rv = dw_load_module(ctxt, file); if (rv) { apr_file_printf(ctxt->errfile, "Error: Unable to load module %s: (%d) %pm\n", file, rv, &rv); apr_pool_destroy(tpool); return rv; } } apr_pool_destroy(tpool); return rv; } /* start io / listening socket. */ static apr_status_t dw_start_io(dw_ctxt_t* ctxt) { apr_status_t rv = APR_SUCCESS; apr_sockaddr_t *saddr; rv = apr_socket_create(&ctxt->m_lskt, APR_INET, SOCK_STREAM, APR_PROTO_TCP, ctxt->m_pool); if (rv) { apr_file_printf(ctxt->errfile, "Error: Unable to create listening socket: (%d) %pm\n", rv, &rv); return rv; } rv = apr_socket_opt_set(ctxt->m_lskt, APR_SO_NONBLOCK, 0); if (rv) { apr_file_printf(ctxt->errfile, "Error: Unable to configure socket as blocking: (%d) %pm\n", rv, &rv); return rv; } rv = apr_socket_opt_set(ctxt->m_lskt, APR_SO_REUSEADDR, 1); if (rv) { apr_file_printf(ctxt->errfile, "Error: Unable to configure socket as reuseaddr: (%d) %pm\n", rv, &rv); return rv; } rv = apr_sockaddr_info_get(&saddr, ctxt->m_conf.addr, APR_UNSPEC, ctxt->m_conf.port, 0, ctxt->m_pool); if (rv) { apr_file_printf(ctxt->errfile, "Error: Unable to get sockaddr for %s:%d: (%d) %pm\n", ctxt->m_conf.addr, ctxt->m_conf.port, rv, &rv); return rv; } rv = apr_socket_bind(ctxt->m_lskt, saddr); if (rv) { apr_file_printf(ctxt->errfile, "Error: Unable to bind listening socket to %s:%d: (%d) %pm\n", ctxt->m_conf.addr, ctxt->m_conf.port, rv, &rv); return rv; } rv = apr_socket_listen(ctxt->m_lskt, 5); if (rv) { apr_file_printf(ctxt->errfile, "Error: Unable to enable listen socket on %s:%d: (%d) %pm\n", ctxt->m_conf.addr, ctxt->m_conf.port, rv, &rv); return rv; } rv = apr_thread_mutex_create(&ctxt->m_lmutex, APR_THREAD_MUTEX_DEFAULT, ctxt->m_pool); if (rv) { apr_file_printf(ctxt->errfile, "Error: Unable to create accept mutex: (%d) %pm\n", rv, &rv); return rv; } return rv; } /* startup all worker threads */ static apr_status_t dw_start_threads(dw_ctxt_t* ctxt) { apr_status_t rv = APR_SUCCESS; int i; dw_worker_t *last = NULL; dw_worker_t *cur = NULL; for (i = 0; i < ctxt->m_conf.threads; i++) { cur = apr_palloc(ctxt->m_pool, sizeof(dw_worker_t)); cur->m_ctxt = ctxt; cur->next = last; last = cur; if (!ctxt->m_threads) { ctxt->m_threads = cur; } if (ctxt->m_conf.verbose) { apr_file_printf(ctxt->outfile, "Spawning Thread #%d\n", i); } rv = apr_thread_create(&cur->m_thread, NULL, dw_worker_thread, cur, ctxt->m_pool); if (rv) { apr_file_printf(ctxt->errfile, "Error: unable to spawn thread: (%d) %pm\n", rv, &rv); return rv; } } return APR_SUCCESS; } /* waits for all threads to finish. */ static apr_status_t dw_join_threads(dw_ctxt_t* ctxt) { apr_status_t rv = APR_SUCCESS; dw_worker_t *cur = ctxt->m_threads; while (cur) { apr_status_t ret = APR_SUCCESS; rv = apr_thread_join(&ret, cur->m_thread); if (rv) { apr_file_printf(ctxt->errfile, "Error: Unable to join worker thread: (%d) %pm\n", rv, &rv); } cur = cur->next; } return rv; } static dw_ctxt_t* dw___global_ctxt = NULL; static void dw_shutdown_sig(int signum) { if (dw___global_ctxt) { dw___global_ctxt->m_keep_running = 0; } } static apr_status_t dw_setup_signals(dw_ctxt_t* ctxt) { dw___global_ctxt = ctxt; // XXXXX: need to break client out of the blocking accept(). // maybe switch to a pollset() on the listening socket? // apr_signal(SIGINT, dw_shutdown_sig); apr_signal(SIGHUP, dw_shutdown_sig); return APR_SUCCESS; } static int dw_run(dw_ctxt_t* ctxt) { apr_status_t rv = APR_SUCCESS; rv = dw_setup_signals(ctxt); if (rv) { return EXIT_FAILURE; } rv = dw_load_modules(ctxt); if (rv) { return EXIT_FAILURE; } rv = dw_start_io(ctxt); if (rv) { return EXIT_FAILURE; } rv = dw_start_threads(ctxt); if (rv) { return EXIT_FAILURE; } rv = dw_join_threads(ctxt); if (rv) { return EXIT_FAILURE; } return EXIT_SUCCESS; } static void dw_create_context(apr_pool_t *p, dw_ctxt_t **p_ctxt) { dw_ctxt_t* ctxt; ctxt = apr_palloc(p, sizeof(dw_ctxt_t)); ctxt->m_pool = p; apr_file_open_stderr(&ctxt->errfile, ctxt->m_pool); apr_file_open_stdout(&ctxt->outfile, ctxt->m_pool); ctxt->m_threads = NULL; ctxt->m_modules = NULL; ctxt->m_plugins = apr_hash_make(ctxt->m_pool); ctxt->m_keep_running = 1; ctxt->m_conf.modules_path = NULL; ctxt->m_conf.addr = "0.0.0.0"; ctxt->m_conf.port = 11310; ctxt->m_conf.threads = 50; ctxt->m_conf.verbose = 0; *p_ctxt = ctxt; } static void dw_show_usage(dw_ctxt_t *ctxt) { apr_file_printf(ctxt->errfile, "dworker " DW_VERSION_STRING "\n"); apr_file_printf(ctxt->errfile, "usage: dworker [options]\n"); apr_file_printf(ctxt->errfile, "\n"); apr_file_printf(ctxt->errfile, "--addr Set the IP address to bind to. Default: 0.0.0.0\n"); apr_file_printf(ctxt->errfile, "--port Set the port to bind to. Default: 11310\n"); apr_file_printf(ctxt->errfile, "--threads Set the number of worker threads. Default: 50\n"); apr_file_printf(ctxt->errfile, "\n"); apr_file_printf(ctxt->errfile, "--modules Set the path for modules to load.\n"); apr_file_printf(ctxt->errfile, "\n"); apr_file_printf(ctxt->errfile, "--help Display this screen and exit.\n"); apr_file_printf(ctxt->errfile, "--version Display version and exit.\n"); apr_file_printf(ctxt->errfile, "--verbose Display extra details while running\n"); apr_file_printf(ctxt->errfile, "\n"); exit(EXIT_FAILURE); } static void dw_show_version(dw_ctxt_t *ctxt) { apr_file_printf(ctxt->outfile, "dworker " DW_VERSION_STRING "\n"); exit(EXIT_SUCCESS); } int main(int argc, const char * const argv[]) { int rv; int i; dw_ctxt_t *ctxt = NULL; apr_pool_t *p = NULL; apr_initialize(); atexit(apr_terminate); apr_pool_create(&p, NULL); dw_create_context(p, &ctxt); for (i = 1; i < argc; i++) { if (strcmp("--version", argv[i]) == 0) { dw_show_version(ctxt); } else if (strcmp("--modules", argv[i]) == 0 && i+1 < argc) { ctxt->m_conf.modules_path = argv[++i]; } else if (strcmp("--addr", argv[i]) == 0 && i+1 < argc) { ctxt->m_conf.addr = argv[++i]; } else if (strcmp("--port", argv[i]) == 0 && i+1 < argc) { ctxt->m_conf.port = atoi(argv[++i]); } else if (strcmp("--threads", argv[i]) == 0 && i+1 < argc) { ctxt->m_conf.threads = atoi(argv[++i]); } else if (strcmp("--verbose", argv[i]) == 0) { ctxt->m_conf.verbose = 1; } else if (strcmp("--help", argv[i]) == 0 && i+1 < argc) { dw_show_usage(ctxt); } else { apr_file_printf(ctxt->errfile, "Error: unknown parameter.\n"); dw_show_usage(ctxt); } } if (ctxt->m_conf.verbose) { apr_file_printf(ctxt->outfile, "dworker configuration:\n"); apr_file_printf(ctxt->outfile, "\tAddress: %s\n", ctxt->m_conf.addr); apr_file_printf(ctxt->outfile, "\tPort: %d\n", ctxt->m_conf.port); apr_file_printf(ctxt->outfile, "\tModules Path: %s\n", ctxt->m_conf.modules_path); apr_file_printf(ctxt->outfile, "\tNum Threads: %d\n", ctxt->m_conf.threads); } if (!ctxt->m_conf.modules_path) { apr_file_printf(ctxt->errfile, "Error: --modules is a required parameter.\n"); return EXIT_FAILURE; } rv = dw_run(ctxt); return rv; }