/**
*Licensed to the Apache Software Foundation (ASF) under one
*or more contributor license agreements. See the NOTICE file
*distributed with this work for additional information
*regarding copyright ownership. The ASF licenses this file
*to you under the Apache License, Version 2.0 (the
*"License"); you may not use this file except in compliance
*with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*Unless required by applicable law or agreed to in writing,
*software distributed under the License is distributed on an
*"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
*specific language governing permissions and limitations
*under the License.
*/
/*
* discovery.c
*
* \date Sep 1, 2013
* \author Apache Celix Project Team
* \copyright Apache License, Version 2.0
*/
#include
#include
#include
#include
#include
#include
#include
#include "constants.h"
#include "bundle_context.h"
#include "array_list.h"
#include "utils.h"
#include "celix_errno.h"
#include "filter.h"
#include "service_reference.h"
#include "service_registration.h"
#include "remote_constants.h"
#include "celix_log.h"
#include "discovery.h"
static void *APR_THREAD_FUNC discovery_poll(apr_thread_t *thd, void *data);
static void discovery_browseCallback(DNSServiceRef sdRef, DNSServiceFlags flags,
uint32_t interfaceIndex, DNSServiceErrorType errorCode,
const char *serviceName, const char *regtype, const char *replyDomain,
void *context);
static void discovery_resolveAddCallback(DNSServiceRef sdRef,
DNSServiceFlags flags, uint32_t interfaceIndex,
DNSServiceErrorType errorCode, const char *fullname,
const char *hosttarget, uint16_t port, /* In network byte order */
uint16_t txtLen, const unsigned char *txtRecord, void *context);
static void discovery_resolveRemoveCallback(DNSServiceRef sdRef,
DNSServiceFlags flags, uint32_t interfaceIndex,
DNSServiceErrorType errorCode, const char *fullname,
const char *hosttarget, uint16_t port, /* In network byte order */
uint16_t txtLen, const unsigned char *txtRecord, void *context);
static celix_status_t discovery_informEndpointListeners(discovery_pt discovery, endpoint_description_pt endpoint, bool addingService);
static const char * const DEFAULT_DISCOVERY_PORT = "8889";
static const char * const OSGI_SERVICE_TYPE = "_osgi._udp";
typedef struct discovered_endpoint_entry {
apr_pool_t *pool;
endpoint_description_pt endpointDescription;
} * discovered_endpoint_entry_pt;
typedef struct disclosed_endpoint_entry {
apr_pool_t *pool;
endpoint_description_pt endpointDescription;
TXTRecordRef *txtRecord;
DNSServiceRef dnsServiceRef;
} * disclosed_endpoint_entry_pt;
struct discovery {
bundle_context_pt context;
apr_pool_t *pool;
apr_thread_mutex_t *listenerReferencesMutex;
apr_thread_mutex_t *discoveredServicesMutex;
apr_thread_mutex_t *disclosedServicesMutex;
hash_map_pt listenerReferences; //key=serviceReference, value=?? TODO
hash_map_pt discoveredServices; //key=endpointId (string), value=discovered_endpoint_entry_pt;
hash_map_pt disclosedServices; //key=endpointId (string), value=disclosed_endpoint_entry_pt;
volatile bool running;
apr_thread_t *poll;
DNSServiceRef browseRef;
char *discoveryPort;
char *frameworkUuid;
};
celix_status_t discovery_create(apr_pool_t *pool, bundle_context_pt context, discovery_pt *discovery) {
celix_status_t status = CELIX_SUCCESS;
*discovery = apr_palloc(pool, sizeof(**discovery));
if (!*discovery) {
status = CELIX_ENOMEM;
} else {
(*discovery)->context = context;
(*discovery)->pool = pool;
(*discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
(*discovery)->discoveredServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
(*discovery)->disclosedServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
(*discovery)->running = true;
(*discovery)->browseRef = NULL;
(*discovery)->discoveryPort = NULL;
(*discovery)->listenerReferencesMutex = NULL;
(*discovery)->discoveredServicesMutex = NULL;
(*discovery)->disclosedServicesMutex = NULL;
(*discovery)->frameworkUuid = NULL;
bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &(*discovery)->frameworkUuid);
CELIX_DO_IF(status, status = apr_thread_mutex_create(&(*discovery)->listenerReferencesMutex, APR_THREAD_MUTEX_DEFAULT, pool));
CELIX_DO_IF(status, status = apr_thread_mutex_create(&(*discovery)->discoveredServicesMutex, APR_THREAD_MUTEX_DEFAULT, pool));
CELIX_DO_IF(status, status = apr_thread_mutex_create(&(*discovery)->disclosedServicesMutex, APR_THREAD_MUTEX_DEFAULT, pool));
char *port = NULL;
bundleContext_getProperty(context, "DISCOVERY_PORT", &port);
if (port == NULL) {
(*discovery)->discoveryPort = (char *) DEFAULT_DISCOVERY_PORT;
} else {
(*discovery)->discoveryPort = apr_pstrdup(pool, port);
}
DNSServiceErrorType error = DNSServiceBrowse(
&(*discovery)->browseRef,
0,
0,
OSGI_SERVICE_TYPE,
NULL, /* may be NULL */
discovery_browseCallback,
(*discovery)/* may be NULL */
);
if (error != kDNSServiceErr_NoError) {
status = CELIX_ILLEGAL_STATE;
}
status = CELIX_DO_IF(status, apr_thread_create(&(*discovery)->poll, NULL, discovery_poll, *discovery, (*discovery)->pool));
}
return status;
}
celix_status_t discovery_stop(discovery_pt discovery) {
celix_status_t status;
apr_status_t tstat;
discovery->running = false;
DNSServiceRefDeallocate(discovery->browseRef);
apr_status_t stat = apr_thread_join(&tstat, discovery->poll);
if (stat != APR_SUCCESS && tstat != APR_SUCCESS) {
status = CELIX_BUNDLE_EXCEPTION;
}
apr_thread_mutex_lock(discovery->disclosedServicesMutex);
hash_map_iterator_pt iter = hashMapIterator_create(discovery->disclosedServices);
while (hashMapIterator_hasNext(iter)) {
hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
disclosed_endpoint_entry_pt endpointEntry = hashMapEntry_getValue(entry);
DNSServiceRefDeallocate(endpointEntry->dnsServiceRef);
}
hashMapIterator_destroy(iter);
iter = hashMapIterator_create(discovery->discoveredServices);
while (hashMapIterator_hasNext(iter)) {
hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
discovered_endpoint_entry_pt endpointEntry = hashMapEntry_getValue(entry);
discovery_informEndpointListeners(discovery, endpointEntry->endpointDescription, false);
}
hashMapIterator_destroy(iter);
hashMap_destroy(discovery->disclosedServices, false, false);
discovery->disclosedServices = NULL;
apr_thread_mutex_unlock(discovery->disclosedServicesMutex);
apr_thread_mutex_lock(discovery->discoveredServicesMutex);
hashMap_destroy(discovery->discoveredServices, false, false);
discovery->discoveredServices = NULL;
apr_thread_mutex_unlock(discovery->discoveredServicesMutex);
apr_thread_mutex_lock(discovery->listenerReferencesMutex);
hashMap_destroy(discovery->listenerReferences, false, false);
discovery->listenerReferences = NULL;
apr_thread_mutex_unlock(discovery->listenerReferencesMutex);
return status;
}
celix_status_t discovery_endpointAdded(void *handle, endpoint_description_pt endpoint, char *machtedFilter) {
celix_status_t status = CELIX_SUCCESS;
discovery_pt discovery = handle;
fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "DISCOVERY: Endpoint for %s, with filter \"%s\" added.", endpoint->service, machtedFilter);
disclosed_endpoint_entry_pt entry = NULL;
apr_pool_t *childPool = NULL;
status = apr_pool_create(&childPool, discovery->pool);
if (status == CELIX_SUCCESS) {
entry = apr_palloc(childPool, sizeof(*entry));
if (entry == NULL) {
status = CELIX_ENOMEM;
apr_pool_destroy(childPool);
} else {
entry->pool = childPool;
entry->endpointDescription = endpoint;
}
}
if (status == CELIX_SUCCESS) {
DNSServiceRef sdRef = NULL;
DNSServiceErrorType error;
TXTRecordRef txtRecord;
TXTRecordCreate(&txtRecord, 256, NULL ); //TODO search for correct default record size
char serviceId[16];
sprintf(serviceId, "%li", endpoint->serviceId);
TXTRecordSetValue(&txtRecord, "service", strlen(endpoint->service),
endpoint->service);
TXTRecordSetValue(&txtRecord, "service.id", strlen(serviceId),
serviceId);
TXTRecordSetValue(&txtRecord, "endpoint.id", strlen(endpoint->id),
endpoint->id);
TXTRecordSetValue(&txtRecord, "framework.uuid", strlen(discovery->frameworkUuid), discovery->frameworkUuid);
hash_map_iterator_pt iter = hashMapIterator_create(
endpoint->properties);
while (hashMapIterator_hasNext(iter)) {
hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
char *key = hashMapEntry_getKey(entry);
char *value = hashMapEntry_getValue(entry);
TXTRecordSetValue(&txtRecord, key, strlen(value), value);
}
hashMapIterator_destroy(iter);
int port = atoi(discovery->discoveryPort);
int portInNetworkByteOrder = ((port << 8) & 0xFF00)
| ((port >> 8) & 0xFF); //FIXME assuming little endian
error = DNSServiceRegister(&sdRef, 0, 0, endpoint->service,
OSGI_SERVICE_TYPE, NULL,
NULL, portInNetworkByteOrder, /* In network byte order */
TXTRecordGetLength(&txtRecord), TXTRecordGetBytesPtr(&txtRecord),
NULL, NULL );
if (error != kDNSServiceErr_NoError) {
status = CELIX_ILLEGAL_STATE;
} else {
//entry->txtRecord=txtRecord; TODO
entry->dnsServiceRef = sdRef;
apr_thread_mutex_lock(discovery->disclosedServicesMutex);
if (discovery->disclosedServices != NULL) {
hashMap_put(discovery->disclosedServices, endpoint->id, entry);
}
apr_thread_mutex_unlock(discovery->disclosedServicesMutex);
}
}
return status;
}
celix_status_t discovery_endpointRemoved(void *handle, endpoint_description_pt endpoint, char *machtedFilter) {
celix_status_t status = CELIX_SUCCESS;
discovery_pt discovery = handle;
disclosed_endpoint_entry_pt entry = NULL;
apr_thread_mutex_lock(discovery->disclosedServicesMutex);
if (discovery->disclosedServices != NULL) {
entry = hashMap_remove(discovery->disclosedServices, endpoint->id);
}
if (entry != NULL) {
DNSServiceRefDeallocate(entry->dnsServiceRef);
apr_pool_destroy(entry->pool);
} else {
status = CELIX_ILLEGAL_STATE;
}
apr_thread_mutex_unlock(discovery->disclosedServicesMutex);
return status;
}
celix_status_t discovery_endpointListenerAdding(void * handle, service_reference_pt reference, void **service) {
celix_status_t status = CELIX_SUCCESS;
discovery_pt discovery = handle;
bundleContext_getService(discovery->context, reference, service);
return status;
}
celix_status_t discovery_endpointListenerAdded(void * handle, service_reference_pt reference, void * service) {
celix_status_t status = CELIX_SUCCESS;
discovery_pt discovery = handle;
char *discoveryListener = NULL;
serviceReference_getProperty(reference, "DISCOVERY", &discoveryListener);
if (discoveryListener != NULL && strcmp(discoveryListener, "true") == 0) {
fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "DISCOVERY: EndpointListener Ignored - Discovery listener.");
} else {
fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "DISCOVERY: EndpointListener Added - Add Scope.");
apr_thread_mutex_lock(discovery->discoveredServicesMutex);
if (discovery->discoveredServices != NULL) {
hash_map_iterator_pt iter = hashMapIterator_create(discovery->discoveredServices);
while (hashMapIterator_hasNext(iter)) {
endpoint_description_pt endpoint = hashMapIterator_nextKey(iter);
endpoint_listener_pt listener = service;
char *scope = NULL;
serviceReference_getProperty(reference, (char *) OSGI_ENDPOINT_LISTENER_SCOPE, &scope);
filter_pt filter = filter_create(scope); //FIXME memory leak
bool matchResult = false;
filter_match(filter, endpoint->properties, &matchResult);
if (matchResult) {
listener->endpointAdded(listener, endpoint, NULL);
}
}
hashMapIterator_destroy(iter);
}
apr_thread_mutex_unlock(discovery->discoveredServicesMutex);
apr_thread_mutex_lock(discovery->listenerReferencesMutex);
if (discovery->listenerReferences != NULL) {
hashMap_put(discovery->listenerReferences, reference, NULL /*TODO is the scope value needed?*/);
}
apr_thread_mutex_unlock(discovery->listenerReferencesMutex);
}
return status;
}
celix_status_t discovery_endpointListenerModified(void * handle, service_reference_pt reference, void * service) {
celix_status_t status = CELIX_SUCCESS;
return status;
}
celix_status_t discovery_endpointListenerRemoved(void * handle, service_reference_pt reference, void * service) {
celix_status_t status = CELIX_SUCCESS;
discovery_pt discovery = handle;
fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "DISCOVERY: EndpointListener Removed");
apr_thread_mutex_lock(discovery->listenerReferencesMutex);
if (discovery->listenerReferences != NULL) {
hashMap_remove(discovery->listenerReferences, reference);
}
apr_thread_mutex_unlock(discovery->listenerReferencesMutex);
return status;
}
static void *APR_THREAD_FUNC discovery_poll(apr_thread_t *thd, void *data) {
discovery_pt discovery = data;
while (discovery->running) {
DNSServiceProcessResult(discovery->browseRef);
}
apr_thread_exit(thd, APR_SUCCESS);
return NULL;
}
static void discovery_browseCallback(DNSServiceRef sdRef, DNSServiceFlags flags,
uint32_t interfaceIndex, DNSServiceErrorType errorCode,
const char *serviceName, const char *regtype, const char *replyDomain,
void *context) {
if (flags & kDNSServiceFlagsAdd) {
fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Added service with %s %s %s.", serviceName, regtype,
replyDomain);
DNSServiceRef resolveRef = NULL;
DNSServiceErrorType resolveError = DNSServiceResolve(&resolveRef, 0, 0,
serviceName, regtype, replyDomain, discovery_resolveAddCallback,
context);
fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Resolve return with error %d.", resolveError);
if (resolveError == kDNSServiceErr_NoError) {
DNSServiceProcessResult(resolveRef);
} else {
//TODO print error / handle error?
}
} else {
fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Removed service with %s %s %s.", serviceName, regtype,
replyDomain);
DNSServiceRef resolveRef = NULL;
DNSServiceErrorType resolveError = DNSServiceResolve(&resolveRef, 0, 0,
serviceName, regtype, replyDomain, discovery_resolveRemoveCallback,
context);
if (resolveError == kDNSServiceErr_NoError) {
DNSServiceProcessResult(resolveRef);
} else {
//TODO print error / handle error?
}
}
}
static void discovery_resolveRemoveCallback(DNSServiceRef sdRef,
DNSServiceFlags flags, uint32_t interfaceIndex,
DNSServiceErrorType errorCode, const char *fullname,
const char *hosttarget, uint16_t port, /* In network byte order */
uint16_t txtLen, const unsigned char *txtRecord, void *context) {
discovery_pt discovery = context;
apr_thread_mutex_lock(discovery->discoveredServicesMutex);
discovered_endpoint_entry_pt entry = NULL;
if (discovery->discoveredServices != NULL) {
entry = hashMap_remove(discovery->discoveredServices, (void *)fullname);
}
apr_thread_mutex_unlock(discovery->discoveredServicesMutex);
if (entry != NULL) {
discovery_informEndpointListeners(discovery, entry->endpointDescription, false);
properties_destroy(entry->endpointDescription->properties);
apr_pool_destroy(entry->pool);
} else {
//unknown or own endpoint -> ignore
}
}
static void discovery_resolveAddCallback(DNSServiceRef sdRef,
DNSServiceFlags flags, uint32_t interfaceIndex,
DNSServiceErrorType errorCode, const char *fullname,
const char *hosttarget, uint16_t port, /* In network byte order */
uint16_t txtLen, const unsigned char *txtRecord, void *context) {
discovery_pt discovery = context;
properties_pt props = properties_create();
int length = TXTRecordGetCount(txtLen, txtRecord);
for (int i = 0; i < length; i += 1) {
char key[256];
char valueBuf[257]; //max uint8 + 1
const void *value = NULL;
uint8_t valueSize = 0;
TXTRecordGetItemAtIndex(txtLen, txtRecord, i, 256, key, &valueSize,
&value);
memcpy(valueBuf, value, valueSize);
valueBuf[valueSize] = '\0';
fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "Found key=value %s=%s.", key, valueBuf);
properties_set(props, key, valueBuf);
}
char *endpointFrameworkUuid = properties_get(props, (char *)OSGI_RSA_ENDPOINT_FRAMEWORK_UUID);
if (endpointFrameworkUuid == NULL) {
fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "DISCOVERY: Cannot process endpoint, no %s property.", OSGI_RSA_ENDPOINT_FRAMEWORK_UUID);
} else if (strcmp(endpointFrameworkUuid, discovery->frameworkUuid) != 0) {
apr_pool_t *childPool = NULL;
apr_pool_create(&childPool, discovery->pool);
discovered_endpoint_entry_pt entry = apr_palloc(childPool, sizeof(*entry));
endpoint_description_pt endpoint = apr_palloc(childPool, sizeof(*endpoint));
char *serviceId = properties_get(props, "endpoint.service.id");
endpoint->id = properties_get(props, "endpoint.id");
endpoint->serviceId = serviceId == NULL? 0 : atol(serviceId);
endpoint->service = properties_get(props, "objectClass");
endpoint->properties = props;
endpoint->frameworkUUID = endpointFrameworkUuid;
entry->pool = childPool;
entry->endpointDescription = endpoint;
apr_thread_mutex_lock(discovery->discoveredServicesMutex);
if (discovery->discoveredServices != NULL) {
hashMap_put(discovery->discoveredServices, endpoint->id, entry);
}
apr_thread_mutex_unlock(discovery->discoveredServicesMutex);
discovery_informEndpointListeners(discovery, endpoint, true);
} else {
//ignore self disclosed endpoints!
fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "DISCOVERY: Ignoring own endpoint, with service %s!", properties_get(props, "service"));
properties_destroy(props);
}
}
static celix_status_t discovery_informEndpointListeners(discovery_pt discovery, endpoint_description_pt endpoint, bool endpointAdded) {
celix_status_t status = CELIX_SUCCESS;
// Inform listeners of new endpoint
apr_thread_mutex_lock(discovery->listenerReferencesMutex);
if (discovery->listenerReferences != NULL) {
hash_map_iterator_pt iter = hashMapIterator_create(discovery->listenerReferences);
while (hashMapIterator_hasNext(iter)) {
hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
service_reference_pt reference = hashMapEntry_getKey(entry);
endpoint_listener_pt listener = NULL;
char *scope = NULL;
serviceReference_getProperty(reference, (char *) OSGI_ENDPOINT_LISTENER_SCOPE, &scope);
filter_pt filter = filter_create(scope);
bool matchResult = false;
filter_match(filter, endpoint->properties, &matchResult);
if (matchResult) {
fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "DISCOVERY: Add service (%s)", endpoint->service);
bundleContext_getService(discovery->context, reference,
(void**) &listener);
if (endpointAdded) {
listener->endpointAdded(listener->handle, endpoint, NULL );
} else {
listener->endpointRemoved(listener->handle, endpoint, NULL );
}
}
}
hashMapIterator_destroy(iter);
}
apr_thread_mutex_unlock(discovery->listenerReferencesMutex);
return status;
}