/** *Licensed to the Apache Software Foundation (ASF) under one *or more contributor license agreements. See the NOTICE file *distributed with this work for additional information *regarding copyright ownership. The ASF licenses this file *to you under the Apache License, Version 2.0 (the *"License"); you may not use this file except in compliance *with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * *Unless required by applicable law or agreed to in writing, *software distributed under the License is distributed on an *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the *specific language governing permissions and limitations *under the License. */ /* * discovery.c * * \date Oct 4, 2011 * \author Apache Celix Project Team * \copyright Apache License, Version 2.0 */ #include #include #include #include #include #include #include #include "bundle_context.h" #include "array_list.h" #include "utils.h" #include "celix_errno.h" #include "filter.h" #include "service_reference.h" #include "service_registration.h" #include "discovery.h" struct discovery { bundle_context_pt context; apr_pool_t *pool; hash_map_pt listenerReferences; bool running; apr_thread_t *slpPoll; hash_map_pt slpServices; char *discoveryPort; array_list_pt handled; array_list_pt registered; }; struct slp_service { char *serviceUrl; char *attributes; }; typedef struct slp_service *slp_service_pt; celix_status_t discovery_informListener(discovery_pt discovery, endpoint_listener_pt listener, endpoint_description_pt endpoint); celix_status_t discovery_informListenerOfRemoval(discovery_pt discovery, endpoint_listener_pt listener, endpoint_description_pt endpoint); celix_status_t discovery_addService(discovery_pt discovery, endpoint_description_pt endpoint); celix_status_t discovery_removeService(discovery_pt discovery, endpoint_description_pt endpoint); celix_status_t discovery_updateEndpointListener(discovery_pt discovery, service_reference_pt reference, endpoint_listener_pt service); static void *APR_THREAD_FUNC discovery_pollSLP(apr_thread_t *thd, void *data); SLPBoolean discovery_pollSLPCallback(SLPHandle hslp, const char* srvurl, unsigned short lifetime, SLPError errcode, void* cookie); SLPBoolean discovery_attributesCallback(SLPHandle hslp, const char *attributes, SLPError error, void *cookie); celix_status_t discovery_deregisterEndpoint(discovery_pt discovery, const char *serviceUrl); void discovery_deregistrationReport(SLPHandle hslp, SLPError errcode, void* cookie); celix_status_t discovery_create(apr_pool_t *pool, bundle_context_pt context, discovery_pt *discovery) { celix_status_t status = CELIX_SUCCESS; *discovery = apr_palloc(pool, sizeof(**discovery)); if (!*discovery) { status = CELIX_ENOMEM; } else { (*discovery)->context = context; (*discovery)->pool = pool; (*discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); (*discovery)->slpServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); (*discovery)->running = true; (*discovery)->discoveryPort = getenv("RSA_PORT"); if ((*discovery)->discoveryPort == NULL) { fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "No RemoteServiceAdmin port set, set it using RSA_PORT!"); } (*discovery)->handled = NULL; arrayList_create(&(*discovery)->handled); (*discovery)->registered = NULL; arrayList_create(&(*discovery)->registered); apr_thread_create(&(*discovery)->slpPoll, NULL, discovery_pollSLP, *discovery, (*discovery)->pool); } return status; } celix_status_t discovery_deregisterEndpoint(discovery_pt discovery, const char *serviceUrl) { celix_status_t status = CELIX_SUCCESS; fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "DISCOVERY: Remove endpoint: %s.", serviceUrl); SLPError err; SLPError callbackerr; SLPHandle slp; err = SLPOpen("en", SLP_FALSE, &slp); if (err != SLP_OK) { status = CELIX_BUNDLE_EXCEPTION; } else { err = SLPDereg(slp, serviceUrl, discovery_deregistrationReport, &callbackerr); if ((err != SLP_OK) || (callbackerr != SLP_OK)) { fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "DISCOVERY: Error deregistering service (%s) with slp %d.", serviceUrl, err); status = CELIX_BUNDLE_EXCEPTION; } SLPClose(slp); } return status; } celix_status_t discovery_stop(discovery_pt discovery) { celix_status_t status = CELIX_SUCCESS; apr_status_t tstat; discovery->running = false; apr_status_t stat = apr_thread_join(&tstat, discovery->slpPoll); if (stat != APR_SUCCESS && tstat != APR_SUCCESS) { status = CELIX_BUNDLE_EXCEPTION; } int i; for (i = 0; i < arrayList_size(discovery->registered); i++) { char *url = arrayList_get(discovery->registered, i); discovery_deregisterEndpoint(discovery, url); } return status; } celix_status_t discovery_removeService(discovery_pt discovery, endpoint_description_pt endpoint) { celix_status_t status = CELIX_SUCCESS; fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "DISCOVERY: Remove service (%s).", endpoint->service); // Inform listeners of new endpoint hash_map_iterator_pt iter = hashMapIterator_create(discovery->listenerReferences); while (hashMapIterator_hasNext(iter)) { hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); service_reference_pt reference = hashMapEntry_getKey(entry); endpoint_listener_pt listener = NULL; bundleContext_getService(discovery->context, reference, (void**)&listener); discovery_informListenerOfRemoval(discovery, listener, endpoint); } hashMapIterator_destroy(iter); return status; } celix_status_t discovery_addService(discovery_pt discovery, endpoint_description_pt endpoint) { celix_status_t status = CELIX_SUCCESS; // Inform listeners of new endpoint hash_map_iterator_pt iter = hashMapIterator_create(discovery->listenerReferences); while (hashMapIterator_hasNext(iter)) { hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); service_reference_pt reference = hashMapEntry_getKey(entry); endpoint_listener_pt listener = NULL; char *scope = NULL; serviceReference_getProperty(reference, (char *) OSGI_ENDPOINT_LISTENER_SCOPE, &scope); filter_pt filter = filter_create(scope); bool matchResult = false; filter_match(filter, endpoint->properties, &matchResult); if (matchResult) { fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "DISCOVERY: Add service (%s)", endpoint->service); bundleContext_getService(discovery->context, reference, (void**)&listener); discovery_informListener(discovery, listener, endpoint); } } hashMapIterator_destroy(iter); return status; } celix_status_t discovery_informListener(discovery_pt discovery, endpoint_listener_pt listener, endpoint_description_pt endpoint) { celix_status_t status = CELIX_SUCCESS; listener->endpointAdded(listener->handle, endpoint, NULL); return status; } celix_status_t discovery_informListenerOfRemoval(discovery_pt discovery, endpoint_listener_pt listener, endpoint_description_pt endpoint) { celix_status_t status = CELIX_SUCCESS; listener->endpointRemoved(listener->handle, endpoint, NULL); return status; } celix_status_t discovery_constructServiceUrl(discovery_pt discovery, endpoint_description_pt endpoint, char **serviceUrl) { celix_status_t status = CELIX_SUCCESS; if (*serviceUrl != NULL || discovery == NULL || endpoint == NULL) { status = CELIX_ILLEGAL_ARGUMENT; } else { char host[APRMAXHOSTLEN + 1]; apr_sockaddr_t *sa; char *ip; apr_status_t stat = apr_gethostname(host, APRMAXHOSTLEN + 1, discovery->pool); if (stat != APR_SUCCESS) { status = CELIX_BUNDLE_EXCEPTION; } else { stat = apr_sockaddr_info_get(&sa, host, APR_INET, 0, 0, discovery->pool); if (stat != APR_SUCCESS) { status = CELIX_BUNDLE_EXCEPTION; } else { stat = apr_sockaddr_ip_get(&ip, sa); if (stat != APR_SUCCESS) { status = CELIX_BUNDLE_EXCEPTION; } else { *serviceUrl = apr_pstrcat(discovery->pool, "service:osgi.remote:http://", ip, ":", discovery->discoveryPort, "/services/", endpoint->service, NULL); } } } } return status; } void discovery_registrationReport(SLPHandle hslp, SLPError errcode, void* cookie) { *(SLPError*)cookie = errcode; } celix_status_t discovery_endpointAdded(void *handle, endpoint_description_pt endpoint, char *machtedFilter) { celix_status_t status = CELIX_SUCCESS; fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "DISCOVERY: Endpoint for %s, with filter \"%s\" added.", endpoint->service, machtedFilter); discovery_pt discovery = handle; SLPError err; SLPError callbackerr; SLPHandle slp; char *serviceUrl = NULL; err = SLPOpen("en", SLP_FALSE, &slp); if (err != SLP_OK) { status = CELIX_ILLEGAL_STATE; } else { status = discovery_constructServiceUrl(discovery, endpoint, &serviceUrl); if (status == CELIX_SUCCESS) { char *attributes = ""; hash_map_iterator_pt iter = hashMapIterator_create(endpoint->properties); while (hashMapIterator_hasNext(iter)) { hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); char *key = hashMapEntry_getKey(entry); char *value = hashMapEntry_getValue(entry); if (strlen(attributes) != 0) { attributes = apr_pstrcat(discovery->pool, attributes, ",", NULL); } attributes = apr_pstrcat(discovery->pool, attributes, "(", key, "=", value, ")", NULL); } hashMapIterator_destroy(iter); err = SLPReg(slp, serviceUrl, SLP_LIFETIME_MAXIMUM, 0, attributes, SLP_TRUE, discovery_registrationReport, &callbackerr); if ((err != SLP_OK) || (callbackerr != SLP_OK)) { status = CELIX_ILLEGAL_STATE; fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "DISCOVERY: Error registering service (%s) with slp %d", serviceUrl, err); } arrayList_add(discovery->registered, serviceUrl); } SLPClose(slp); } return status; } void discovery_deregistrationReport(SLPHandle hslp, SLPError errcode, void* cookie) { *(SLPError*)cookie = errcode; } celix_status_t discovery_endpointRemoved(void *handle, endpoint_description_pt endpoint, char *machtedFilter) { celix_status_t status = CELIX_SUCCESS; fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "DISCOVERY: Endpoint for %s, with filter \"%s\" removed.", endpoint->service, machtedFilter); discovery_pt discovery = handle; SLPError err; SLPHandle slp; char *serviceUrl = NULL; err = SLPOpen("en", SLP_FALSE, &slp); if (err != SLP_OK) { status = CELIX_ILLEGAL_STATE; } else { status = discovery_constructServiceUrl(discovery, endpoint, &serviceUrl); if (status == CELIX_SUCCESS) { status = discovery_deregisterEndpoint(discovery, serviceUrl); int i; for (i = 0; i < arrayList_size(discovery->registered); i++) { char *url = arrayList_get(discovery->registered, i); if (strcmp(url, serviceUrl) == 0) { arrayList_remove(discovery->registered, i); } } } } return status; } celix_status_t discovery_endpointListenerAdding(void * handle, service_reference_pt reference, void **service) { celix_status_t status = CELIX_SUCCESS; discovery_pt discovery = handle; bundleContext_getService(discovery->context, reference, service); return status; } celix_status_t discovery_endpointListenerAdded(void * handle, service_reference_pt reference, void * service) { celix_status_t status = CELIX_SUCCESS; discovery_pt discovery = handle; char *discoveryListener = NULL; serviceReference_getProperty(reference, "DISCOVERY", &discoveryListener); if (discoveryListener != NULL && strcmp(discoveryListener, "true") == 0) { fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "DISCOVERY: EndpointListener Ignored - Discovery listener."); } else { fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "DISCOVERY: EndpointListener Added - Add Scope."); discovery_updateEndpointListener(discovery, reference, (endpoint_listener_pt) service); } return status; } celix_status_t discovery_endpointListenerModified(void * handle, service_reference_pt reference, void * service) { celix_status_t status = CELIX_SUCCESS; discovery_pt discovery = handle; fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "DISCOVERY: EndpointListener Modified - Update Scope"); discovery_updateEndpointListener(discovery, reference, (endpoint_listener_pt) service); return status; } celix_status_t discovery_updateEndpointListener(discovery_pt discovery, service_reference_pt reference, endpoint_listener_pt service) { celix_status_t status = CELIX_SUCCESS; char *scope = "createScopeHere"; array_list_pt scopes = hashMap_get(discovery->listenerReferences, reference); if (scopes == NULL) { scopes = NULL; arrayList_create(&scopes); hashMap_put(discovery->listenerReferences, reference, scopes); } if (!arrayList_contains(scopes, scope)) { arrayList_add(scopes, scope); } hash_map_iterator_pt iter = hashMapIterator_create(discovery->slpServices); while (hashMapIterator_hasNext(iter)) { hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); endpoint_description_pt value = hashMapEntry_getValue(entry); discovery_informListener(discovery, service, value); } hashMapIterator_destroy(iter); return status; } celix_status_t discovery_endpointListenerRemoved(void * handle, service_reference_pt reference, void * service) { celix_status_t status = CELIX_SUCCESS; discovery_pt discovery = handle; fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "DISCOVERY: EndpointListener Removed."); hashMap_remove(discovery->listenerReferences, reference); return status; } static void *APR_THREAD_FUNC discovery_pollSLP(apr_thread_t *thd, void *data) { discovery_pt discovery = data; SLPHandle slp; SLPError err; err = SLPOpen("en", SLP_FALSE, &slp); while (discovery->running) { SLPError err = SLP_OK; arrayList_clear(discovery->handled); while (err == SLP_TRUE) { err = SLPFindSrvs(slp, "osgi.remote", 0, 0, discovery_pollSLPCallback, data); } hash_map_iterator_pt iter = hashMapIterator_create(discovery->slpServices); while (hashMapIterator_hasNext(iter)) { hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); char *key = hashMapEntry_getKey(entry); endpoint_description_pt value = hashMapEntry_getValue(entry); bool inUse = false; int i; for (i = 0; i < arrayList_size(discovery->handled); i++) { char *url = arrayList_get(discovery->handled, i); if (strcmp(url, key) == 0) { inUse = true; break; } } if (!inUse) { discovery_removeService(discovery, value); hashMapIterator_remove(iter); } } hashMapIterator_destroy(iter); sleep(1); } SLPClose(slp); apr_thread_exit(thd, APR_SUCCESS); return NULL; } SLPBoolean discovery_pollSLPCallback(SLPHandle hslp, const char* srvurl, unsigned short lifetime, SLPError errcode, void *cookie) { discovery_pt discovery = cookie; if (errcode == SLP_OK) { arrayList_add(discovery->handled, (void *) srvurl); if (!hashMap_containsKey(discovery->slpServices, (void *) srvurl)) { // service:osgi.remote:http://10.0.0.21:8080/services/example if (strncmp(srvurl, "service:osgi.remote:", 20) == 0) { const char *url = srvurl+20; const char *srv = strrchr(url, '/')+1; SLPHandle handle = NULL; SLPError err = SLPOpen("en", SLP_FALSE, &handle); err = SLP_OK; slp_service_pt slpService = apr_palloc(discovery->pool, sizeof(*slpService)); while (err == SLP_OK) { err = SLPFindAttrs(handle, srvurl, "", "", discovery_attributesCallback, slpService); } properties_pt props = properties_create(); char *track; char *token = apr_strtok(slpService->attributes, ",", &track); while (token != NULL) { char *track2; char *token2 = apr_strtok(token, "=", &track2); char *token3 = apr_strtok(NULL, "=", &track2); char *key = apr_pstrdup(discovery->pool, token2+1); char *value = apr_pstrndup(discovery->pool, token3, strlen(token3) - 1); properties_set(props, key, value); token = apr_strtok(NULL, ",", &track); } endpoint_description_pt endpoint = apr_palloc(discovery->pool, sizeof(*endpoint)); endpoint->id = apr_pstrdup(discovery->pool, url); endpoint->serviceId = 42; endpoint->service = apr_pstrdup(discovery->pool, srv); endpoint->properties = props; endpoint->frameworkUUID = properties_get(props, "endpoint.framework.uuid"); discovery_addService(discovery, endpoint); hashMap_put(discovery->slpServices, apr_pstrdup(discovery->pool, srvurl), endpoint); } } } else if (errcode == SLP_LAST_CALL) { return SLP_FALSE; } else { } return SLP_TRUE; } SLPBoolean discovery_attributesCallback(SLPHandle hslp, const char *attributes, SLPError error, void *cookie) { slp_service_pt slpService = cookie; if (error == SLP_OK) { slpService->attributes = strdup(attributes); } else if (error == SLP_LAST_CALL) { return SLP_FALSE; } return SLP_TRUE; }