38 static const char rcsid[] __attribute__((used)) =
"$Id: flowmon-export-ipfix.c 928 2013-01-16 08:32:47Z 255519 $";
64 #include <sys/socket.h>
65 #include <arpa/inet.h>
69 #include <flowmonexp/plugin_export.h>
100 conf->
ip = AF_UNSPEC;
118 char *token, *value, *param;
122 if (params == NULL) {
123 fprintf(stderr,
"IPFIX export plugin need parameters\n");
128 token = strtok(params,
",=");
129 while (token != NULL) {
134 MSG(
"param: %s\t", param);
135 MSG(
"value: %s\n", value);
137 if (strcmp(
"host", param) == 0) {
139 }
else if (strcmp(
"template-file", param) == 0) {
141 }
else if (strcmp(
"port", param) == 0) {
143 }
else if (strcmp(
"protocol", param) == 0) {
144 if (strcmp(
"UDP", value) == 0) {
146 }
else if (strcmp(
"TCP", value) == 0) {
148 }
else if (strcmp(
"SCTP", value) == 0) {
151 fprintf(stderr,
"Unknown protocol %s\n", value);
154 }
else if (strcmp(
"ip", param) == 0) {
155 int ip = atoi(value);
157 case 4: conf->
ip = AF_INET;
break;
158 case 6: conf->
ip = AF_INET6;
break;
161 conf->
flags = AI_V4MAPPED | AI_ALL;
163 default: conf->
ip = AF_UNSPEC;
break;
165 }
else if (strcmp(
"reconnect-timeout", param) == 0) {
167 }
else if (strcmp(
"odid", param) == 0) {
168 conf->
odid = atoi(value);
169 }
else if (strcmp(
"template-refresh-time", param) == 0) {
171 }
else if (strcmp(
"template-refresh-packets", param) == 0) {
174 fprintf(stderr,
"Unknown parameter '%s'\n", param);
180 token = strtok(NULL,
",=");
186 fprintf(stderr,
"'%s' does not have a value\n", param);
191 if (conf->
port == NULL) {
192 printf(
"Using default port 4739\n");
196 printf(
"Using default protocol UDP\n");
202 if (conf->
host == NULL) {
203 printf(
"Using default hostname 'localhost'\n");
204 conf->
host =
"localhost";
207 printf(
"Using default template refresh time '600s'\n");
230 MSG(
"Cannot get template for NULL name\n");
234 while (tmpFileRecord && tmpFileRecord->
getterName) {
235 if (strcmp(name, tmpFileRecord->
getterName) == 0) {
236 return tmpFileRecord;
249 static void fei_store_getters(flow_record_getter_t *getter_list,
plugin_private_t *conf)
251 flow_record_getter_t *tmp = getter_list;
255 while (tmp && tmp->name) {
256 MSG(
"getter: %s - %i", tmp->name, tmp->length);
260 if (tmpFileRecord != NULL) {
262 getter_copy_to(&conf->
getters, tmp);
279 int res, line = 1, countLines = 0, c;
286 fprintf(stderr,
"Cannot open template file %s\n", conf->
templateFile);
293 if (c ==
'\n') countLines++;
295 fseek(f, SEEK_SET, 0);
316 }
else if (res < 3 || tfr->getterName[0] ==
'#') {
345 static void fei_init_template_buffer(
template_t *tmpl)
347 *((uint16_t *) &tmpl->
buffer[0]) = htons(tmpl->
id);
368 flow_record_getter_t *tmpGetter = conf->
getters;
378 newTemplate->fieldCount = 0;
379 newTemplate->recordCount = 0;
382 while (tmpTemplate != NULL) {
383 if (tmpTemplate->
id >= maxID) maxID = tmpTemplate->
id + 1;
384 tmpTemplate = tmpTemplate->
next;
386 newTemplate->
id = maxID;
387 ((uint16_t *) newTemplate->templateRecord)[0] = htons(newTemplate->id);
390 newTemplate->templateSize = 4;
393 while (tmpGetter && tmpGetter->name) {
394 if (tmpGetter->valid(tmpGetter->self, record)) {
398 if (tmpFileRecord != NULL) {
404 *((uint16_t *) &newTemplate->templateRecord[newTemplate->templateSize]) = htons(eID);
407 if (tmpFileRecord->
length == 0) {
408 len = tmpGetter->length;
410 len = tmpFileRecord->
length;
412 *((uint16_t *) &newTemplate->templateRecord[newTemplate->templateSize+2]) = htons(len);
415 newTemplate->templateSize += 4;
419 *((uint32_t *) &newTemplate->templateRecord[newTemplate->templateSize]) =
421 newTemplate->templateSize += 4;
425 getter_copy_to(&newTemplate->templateGetters, tmpGetter);
428 newTemplate->fieldCount++;
430 MSG(
"Cannot find template file record for getter %s\n", tmpGetter->name);
438 newTemplate->mask = getter_bitmap_from_record(conf->
getters, record);
441 ((uint16_t *) newTemplate->templateRecord)[1] = htons(newTemplate->fieldCount);
445 if (!newTemplate->templateFileRecords) {
446 free(newTemplate->templateGetters);
453 tmpGetter = newTemplate->templateGetters;
454 while (tmpGetter && tmpGetter->name) {
455 *tmpFileRecord = fei_get_template_record_by_name(tmpGetter->name, conf);
461 fei_init_template_buffer(newTemplate);
467 newTemplate->exported = 0;
468 newTemplate->exportTime = time(NULL);
487 uint64_t mask = getter_bitmap_from_record(conf->
getters, record);
491 while (tmpTemplate != NULL) {
492 MSG(
"Comparing templates: %lu %lu\n", mask, tmpTemplate->
mask);
494 if (tmpTemplate->
mask == mask) {
497 tmpTemplate = tmpTemplate->
next;
538 uint16_t recordLen = 0;
539 uint8_t variableLength = 0;
541 while (tmpGetter && tmpGetter->name) {
542 if (tmpGetter->valid(tmpGetter->self, record)) {
545 if ((*tmpFileRecord)->length == 0) {
546 elementLen = tmpGetter->
length;
548 elementLen = (*tmpFileRecord)->length;
552 if (elementLen == -1) {
553 elementLen = tmpGetter->current_length(tmpGetter->self, record);
561 MSG(
"Template %i is full\n", tmpl->
id);
567 fprintf(stderr,
"Element %s trimmed to %i bytes\n", tmpGetter->name,
PACKET_DATA_SIZE);
571 if (variableLength) {
573 if (elementLen < 255) {
600 *((uint16_t *) &tmpl->
buffer[tmpl->
bufferSize + recordLen + 1]) = htons(elementLen);
608 tmpGetter->filler(tmpGetter->self, record,
611 recordLen += elementLen;
613 MSG(
"Record added to invalid template\n");
622 MSG(
"Buffered data exceeded maximum packet data size, need to export data\n");
641 static int fei_fill_ipfix_header(
char *ptr, uint16_t size,
plugin_private_t *conf)
646 header->
length = htons(size);
661 static int fei_fill_template_set_header(
char *ptr, uint16_t size)
667 header->
length = htons(size);
705 uint16_t totalSize = 0;
709 while (tmp != NULL) {
711 if (conf->
protocol == IPPROTO_UDP) {
712 fei_check_template_lifetime(tmp, conf);
721 if (totalSize == 0) {
728 packet->
data = malloc(
sizeof(
char)*(totalSize));
735 ptr += fei_fill_ipfix_header(ptr, totalSize, conf);
742 while (tmp != NULL) {
754 packet->
length = totalSize;
774 uint32_t deltaSequenceNum = 0;
782 ptr = buff + totalSize;
786 while (tmp != NULL) {
792 MSG(
"Adding template %i of length %i to data packet\n", tmp->
id, tmp->
bufferSize);
814 fei_fill_ipfix_header(buff, totalSize, conf);
817 packet->
flows = deltaSequenceNum;
818 packet->
length = totalSize;
833 if (conf->
protocol == IPPROTO_UDP) {
850 if (fei_create_template_packet(&pkt, conf)) {
854 fei_send_packet(&pkt, conf);
870 if (fei_create_data_packet(&pkt, conf)) {
871 if (fei_send_packet(&pkt, conf) == 1) {
873 fei_send_packet(&pkt, conf);
889 struct addrinfo hints, *tmp;
891 memset(&hints, 0,
sizeof(hints));
892 hints.ai_family = conf->
ip;
894 hints.ai_flags = AI_ADDRCONFIG | conf->
flags;
898 perror(
"Cannot get server info");
903 for (tmp = conf->
addrinfo; tmp != NULL; tmp = tmp->ai_next) {
905 if (tmp->ai_family != AF_INET && tmp->ai_family != AF_INET6) {
910 char buff[INET6_ADDRSTRLEN];
911 inet_ntop(tmp->ai_family,
912 (tmp->ai_family == AF_INET) ?
913 (
void *) &((
struct sockaddr_in *) tmp->ai_addr)->sin_addr :
914 (
void *) &((
struct sockaddr_in6 *) tmp->ai_addr)->sin6_addr,
915 (
char *) &buff,
sizeof(buff));
917 printf(
"Connecting to IP %s\n", buff);
919 MSG(
"Socket configuration: AI Family: %i, AI Socktype: %i, AI Protocol: %i\n",
920 tmp->ai_family, tmp->ai_socktype, tmp->ai_protocol);
926 perror(
"Cannot create new socket");
931 if (conf->
protocol != IPPROTO_UDP &&
933 perror(
"Cannot connect to collector");
939 if (conf->
protocol != IPPROTO_UDP) {
940 printf(
"Successfully connected to collector\n");
968 if (fei_connect(conf) == 0) {
971 fei_expire_templates(conf);
972 fei_send_templates(conf);
1002 if (fei_reconnect(conf)) {
1007 while (sent < packet->length) {
1009 ret = sendto(conf->
socket, (
void *) packet->
data + sent, packet->
length - sent, 0,
1028 fprintf(stderr,
"Collector closed connection\n");
1045 perror(
"Cannot send data to collector");
1060 MSG(
"Packet (%li) sent to %s on port %s. Next sequence number is %i\n",
1078 return &plugin_desc;
1095 MSG(
"IPFIX export plugin init start\n");
1104 fei_init_configuration(conf);
1107 if (fei_parse_params(params, conf)) {
1112 if (fei_connect(conf) != 0) {
1117 if (fei_load_template_file(conf)) {
1122 fei_store_getters(getter_list, conf);
1124 MSG(
"IPFIX export plugin init end\n");
1144 flow_record_getter_t *tmp = conf->
getters;
1145 unsigned char block[1024];
1149 while (tmp && tmp->name) {
1150 if (tmp->valid(tmp->self, record)) {
1151 int len = tmp->current_length(tmp->self, record);
1155 MSG(
"%s: ", tmp->name);
1156 tmp->filler(tmp->self, record, block, len, 0);
1157 for (i = 0; i < len; i++) {
1158 MSG(
"%02X", block[i]);
1168 template_t *tmpl = fei_find_template(record, conf);
1170 MSG(
"Creating new template\n");
1171 tmpl = fei_create_template(record, conf);
1172 MSG(
"Template created\n");
1174 MSG(
"Cannot create new template\n");
1178 MSG(
"Adding record to template\n");
1179 if (fei_add_record(record, tmpl, conf) == 1) {
1183 fei_send_templates(conf);
1186 fei_send_data(conf);
1189 fei_add_record(record, tmpl, conf);
1207 MSG(
"plugin flush\n");
1211 fei_send_templates(conf);
1214 fei_send_data(conf);