flowmon-export-ipfix  1.2.1
FlowMon IPFIX Export Plugin
 All Data Structures Files Functions Variables Typedefs Macros Pages
flowmon-export-ipfix.c
Go to the documentation of this file.
1 
38 static const char rcsid[] __attribute__((used)) = "$Id: flowmon-export-ipfix.c 928 2013-01-16 08:32:47Z 255519 $";
39 
57 /* This is necessary to enable GNU extension for fprintf: "%as" calls malloc */
58 #define _GNU_SOURCE
59 
60 #include <stdio.h>
61 #include <stdlib.h>
62 #include <unistd.h>
63 #include <string.h>
64 #include <sys/socket.h>
65 #include <arpa/inet.h>
66 #include <endian.h>
67 #include <errno.h>
68 
69 #include <flowmonexp/plugin_export.h>
70 #include "flowmon-export-ipfix.h"
71 
73 SET_PLUGIN_TYPE(PLUGIN_TYPE_EXPORT);
74 
75 /*********** USER FUNCTIONS ************/
76 
77 /* Forward declarations */
78 static int fei_send_packet(ipfix_packet_t *packet, plugin_private_t *conf);
79 
80 
86 static void fei_init_configuration(plugin_private_t *conf)
87 {
88  conf->getters = NULL;
89  conf->templates = NULL;
90  conf->sequenceNum = 0;
91  conf->templatesDataSize = 0;
92  conf->counter = 0;
93  conf->exportedPackets = 0;
94 
95  /* Set params */
96  conf->host = NULL;
97  conf->templateFile = NULL;
98  conf->port = NULL;
99  conf->protocol = 0;
100  conf->ip = AF_UNSPEC;
101  conf->flags = 0;
102  conf->reconnectTimeout = 0;
103  conf->lastReconnect = 0;
104  conf->odid = 0;
105  conf->templateRefreshTime = 0;
106  conf->templateRefreshPackets = 0;
107 }
108 
116 static int fei_parse_params(char *params, plugin_private_t *conf)
117 {
118  char *token, *value, *param;
119  uint8_t state = 0; /* state 0: expect param, state 1: expect value */
120 
121  /* Check that there are parameters */
122  if (params == NULL) {
123  fprintf(stderr, "IPFIX export plugin need parameters\n");
124  return 1;
125  }
126 
127  /* Parse parameter string */
128  token = strtok(params, ",=");
129  while (token != NULL) {
130  if (state == 0) {
131  param = token;
132  } else {
133  value = token;
134  MSG("param: %s\t", param);
135  MSG("value: %s\n", value);
136 
137  if (strcmp("host", param) == 0) {
138  conf->host = value;
139  } else if (strcmp("template-file", param) == 0) {
140  conf->templateFile = value;
141  } else if (strcmp("port", param) == 0) {
142  conf->port = value;
143  } else if (strcmp("protocol", param) == 0) {
144  if (strcmp("UDP", value) == 0) {
145  conf->protocol = IPPROTO_UDP;
146  } else if (strcmp("TCP", value) == 0) {
147  conf->protocol = IPPROTO_TCP;
148  } else if (strcmp("SCTP", value) == 0) {
149  conf->protocol = IPPROTO_SCTP;
150  } else {
151  fprintf(stderr, "Unknown protocol %s\n", value);
152  return 1;
153  }
154  } else if (strcmp("ip", param) == 0) {
155  int ip = atoi(value);
156  switch (ip) {
157  case 4: conf->ip = AF_INET; break;
158  case 6: conf->ip = AF_INET6; break;
159  case 64:
160  conf->ip = AF_INET6;
161  conf->flags = AI_V4MAPPED | AI_ALL;
162  break;
163  default: conf->ip = AF_UNSPEC; break;
164  }
165  } else if (strcmp("reconnect-timeout", param) == 0) {
166  conf->reconnectTimeout = atoi(value);
167  } else if (strcmp("odid", param) == 0) {
168  conf->odid = atoi(value);
169  } else if (strcmp("template-refresh-time", param) == 0) {
170  conf->templateRefreshTime = atoi(value);
171  } else if (strcmp("template-refresh-packets", param) == 0) {
172  conf->templateRefreshPackets = atoi(value);
173  } else {
174  fprintf(stderr, "Unknown parameter '%s'\n", param);
175  return 1;
176  }
177  }
178 
179  /* Move to next token */
180  token = strtok(NULL, ",=");
181  state = 1 - state;
182  }
183 
184  /* Check that every parameter has its value */
185  if (state == 1) {
186  fprintf(stderr, "'%s' does not have a value\n", param);
187  return 1;
188  }
189 
190  /* Set default values */
191  if (conf->port == NULL) {
192  printf("Using default port 4739\n");
193  conf->port = "4739";
194  }
195  if (conf->protocol == 0) {
196  printf("Using default protocol UDP\n");
197  conf->protocol = IPPROTO_UDP;
198  }
199  if (conf->reconnectTimeout < 1) {
201  }
202  if (conf->host == NULL) {
203  printf("Using default hostname 'localhost'\n");
204  conf->host = "localhost";
205  }
206  if (conf->protocol == IPPROTO_UDP && conf->templateRefreshTime == 0) {
207  printf("Using default template refresh time '600s'\n");
208  conf->templateRefreshTime = 600;
209  }
210  if (conf->templateFile == NULL) {
211  printf("Using default template file "DEFAULT_TEMPLATE_FILE"\n");
213  }
214 
215  return 0;
216 }
217 
225 static template_file_record_t *fei_get_template_record_by_name(const char *name, plugin_private_t *conf)
226 {
227  template_file_record_t *tmpFileRecord = conf->templateFileRecords;
228 
229  if (name == NULL) {
230  MSG("Cannot get template for NULL name\n");
231  return NULL;
232  }
233 
234  while (tmpFileRecord && tmpFileRecord->getterName) {
235  if (strcmp(name, tmpFileRecord->getterName) == 0) {
236  return tmpFileRecord;
237  }
238  tmpFileRecord++;
239  }
240 
241  return NULL;
242 }
243 
249 static void fei_store_getters(flow_record_getter_t *getter_list, plugin_private_t *conf)
250 {
251  flow_record_getter_t *tmp = getter_list;
252 
253  conf->getters = NULL;
254 
255  while (tmp && tmp->name) {
256  MSG("getter: %s - %i", tmp->name, tmp->length);
257  /* Find appropriate template file record */
258  template_file_record_t *tmpFileRecord = fei_get_template_record_by_name(tmp->name, conf);
259  /* Use only allowed getters */
260  if (tmpFileRecord != NULL) {
261  MSG(" (used)");
262  getter_copy_to(&conf->getters, tmp);
263  }
264  MSG("\n");
265  tmp++;
266  }
267 }
268 
277 static int fei_load_template_file(plugin_private_t *conf)
278 {
279  int res, line = 1, countLines = 0, c;
280  template_file_record_t *tfr = NULL;
281  FILE *f;
282 
283  /* Open the template file */
284  f = fopen(conf->templateFile, "r");
285  if (!f) {
286  fprintf(stderr, "Cannot open template file %s\n", conf->templateFile);
287  return 1;
288  }
289 
290  /* Count the number of lines in the file */
291  do {
292  c = fgetc(f);
293  if (c == '\n') countLines++;
294  } while (c != EOF);
295  fseek(f, SEEK_SET, 0);
296 
297  /* Allocate big enough buffer */
298  conf->templateFileRecords = calloc((countLines + 1), sizeof(template_file_record_t));
299  if (!conf->templateFileRecords) {
300  return 1;
301  }
302  tfr = conf->templateFileRecords;
303 
304  while (1) {
305  /* Fill the structure */
306  /* In newer versions of glibc, 'm' modificator is defined instead of 'a' for dynamically allocated strings */
307  res = fscanf(f, "%as %hu %hu %d\n", &tfr->getterName, &tfr->enterpriseNumber, &tfr->elementID, &tfr->length);
308 
309  /* Check the result */
310  if (res == EOF) {
311  /* End of file */
312  if (tfr->getterName) free(tfr->getterName);
313  tfr->getterName = NULL;
314  tfr = NULL;
315  break;
316  } else if (res < 3 || tfr->getterName[0] == '#') {
317  /* Not enough parameters or comment, skip line */
318  if (tfr->getterName) free(tfr->getterName);
319  tfr->getterName = NULL;
320  line++;
321  continue;
322  }
323 
324  MSG("Template element: %s, %i, %i, %i\n", tfr->getterName, tfr->enterpriseNumber, tfr->elementID, tfr->length);
325 
326  line++;
327  tfr++;
328  }
329 
330 
331  return 0;
332 }
333 
345 static void fei_init_template_buffer(template_t *tmpl)
346 {
347  *((uint16_t *) &tmpl->buffer[0]) = htons(tmpl->id);
348  /* Length will be updated later */
349  /* *((uint16_t *) &tmpl->buffer[2]) = htons(0); */
350  tmpl->bufferSize = 4;
351 }
352 
366 static template_t *fei_create_template(flow_record_t *record, plugin_private_t *conf)
367 {
368  flow_record_getter_t *tmpGetter = conf->getters;
369  uint16_t maxID = FIRST_TEMPLATE_ID, len;
370  template_t *tmpTemplate = conf->templates, *newTemplate;
371 
372  /* Create new template structure */
373  newTemplate = (template_t *) malloc(sizeof(template_t));
374  if (!newTemplate) {
375  return NULL;
376  }
377  newTemplate->templateGetters = NULL;
378  newTemplate->fieldCount = 0;
379  newTemplate->recordCount = 0;
380 
381  /* Set template ID to maximum + 1 */
382  while (tmpTemplate != NULL) {
383  if (tmpTemplate->id >= maxID) maxID = tmpTemplate->id + 1;
384  tmpTemplate = tmpTemplate->next;
385  }
386  newTemplate->id = maxID;
387  ((uint16_t *) newTemplate->templateRecord)[0] = htons(newTemplate->id);
388 
389  /* Template header size */
390  newTemplate->templateSize = 4;
391 
392  /* For every valid getter */
393  while (tmpGetter && tmpGetter->name) {
394  if (tmpGetter->valid(tmpGetter->self, record)) {
395 
396  /* Find appropriate template file record */
397  template_file_record_t *tmpFileRecord = fei_get_template_record_by_name(tmpGetter->name, conf);
398  if (tmpFileRecord != NULL) {
399  /* Set information element ID */
400  uint16_t eID = tmpFileRecord->elementID;
401  if (tmpFileRecord->enterpriseNumber != 0) {
402  eID |= 0x8000;
403  }
404  *((uint16_t *) &newTemplate->templateRecord[newTemplate->templateSize]) = htons(eID);
405 
406  /* Set element length */
407  if (tmpFileRecord->length == 0) {
408  len = tmpGetter->length;
409  } else {
410  len = tmpFileRecord->length;
411  }
412  *((uint16_t *) &newTemplate->templateRecord[newTemplate->templateSize+2]) = htons(len);
413 
414  /* Update template size */
415  newTemplate->templateSize += 4;
416 
417  /* Add enterprise number if required */
418  if (tmpFileRecord->enterpriseNumber != 0) {
419  *((uint32_t *) &newTemplate->templateRecord[newTemplate->templateSize]) =
420  htonl(tmpFileRecord->enterpriseNumber);
421  newTemplate->templateSize += 4;
422  }
423 
424  /* Store the template */
425  getter_copy_to(&newTemplate->templateGetters, tmpGetter);
426 
427  /* Increase field count */
428  newTemplate->fieldCount++;
429  } else {
430  MSG("Cannot find template file record for getter %s\n", tmpGetter->name);
431  }
432  }
433 
434  tmpGetter++;
435  }
436 
437  /* Store getter mask */
438  newTemplate->mask = getter_bitmap_from_record(conf->getters, record);
439 
440  /* Set field count */
441  ((uint16_t *) newTemplate->templateRecord)[1] = htons(newTemplate->fieldCount);
442 
443  /* Allocate memory for relevant template records loaded from file */
444  newTemplate->templateFileRecords = malloc(sizeof(template_file_record_t *)*(newTemplate->fieldCount+1));
445  if (!newTemplate->templateFileRecords) {
446  free(newTemplate->templateGetters);
447  free(newTemplate);
448  return NULL;
449  }
450 
451  /* Store relevant template records loaded from file */
452  template_file_record_t **tmpFileRecord = newTemplate->templateFileRecords;
453  tmpGetter = newTemplate->templateGetters;
454  while (tmpGetter && tmpGetter->name) {
455  *tmpFileRecord = fei_get_template_record_by_name(tmpGetter->name, conf);
456  tmpGetter++;
457  tmpFileRecord++;
458  }
459 
460  /* Initialize buffer for records */
461  fei_init_template_buffer(newTemplate);
462 
463  /* Update total template size */
464  conf->templatesDataSize += newTemplate->bufferSize;
465 
466  /* The template was not exported yet */
467  newTemplate->exported = 0;
468  newTemplate->exportTime = time(NULL);
469  newTemplate->exportPacket = conf->exportedPackets;
470 
471  /* Add the new template to the list */
472  newTemplate->next = conf->templates;
473  conf->templates = newTemplate;
474 
475  return newTemplate;
476 }
477 
485 static template_t *fei_find_template(flow_record_t *record, plugin_private_t *conf)
486 {
487  uint64_t mask = getter_bitmap_from_record(conf->getters, record);
488  template_t *tmpTemplate = conf->templates;
489 
490  /* Go over all templates */
491  while (tmpTemplate != NULL) {
492  MSG("Comparing templates: %lu %lu\n", mask, tmpTemplate->mask);
493  /* Return the one with matching mask */
494  if (tmpTemplate->mask == mask) {
495  return tmpTemplate;
496  }
497  tmpTemplate = tmpTemplate->next;
498  }
499 
500  /* Return NULL when there is no matching template */
501  return NULL;
502 }
503 
504 
533 static int fei_add_record(flow_record_t *record, template_t *tmpl, plugin_private_t *conf)
534 {
535  flow_record_getter_t *tmpGetter = tmpl->templateGetters;
536  template_file_record_t **tmpFileRecord = tmpl->templateFileRecords;
537  int32_t elementLen;
538  uint16_t recordLen = 0;
539  uint8_t variableLength = 0;
540 
541  while (tmpGetter && tmpGetter->name) {
542  if (tmpGetter->valid(tmpGetter->self, record)) {
543 
544  /* Get element length. From template file or from getter. */
545  if ((*tmpFileRecord)->length == 0) {
546  elementLen = tmpGetter->length;
547  } else {
548  elementLen = (*tmpFileRecord)->length;
549  }
550 
551  /* Handle variable length elements */
552  if (elementLen == -1) {
553  elementLen = tmpGetter->current_length(tmpGetter->self, record);
554  variableLength = 1;
555  } else {
556  variableLength = 0;
557  }
558 
559  /* Check that the buffer is not full yet, variable length can add at most 3 bytes */
560  if (tmpl->bufferSize + recordLen + elementLen + variableLength*3 > TEMPLATE_BUFFER_SIZE) {
561  MSG("Template %i is full\n", tmpl->id);
562  return 1;
563  }
564 
565  /* Trim elements that do not fit the buffer */
566  if (elementLen > PACKET_DATA_SIZE) {
567  fprintf(stderr, "Element %s trimmed to %i bytes\n", tmpGetter->name, PACKET_DATA_SIZE);
568  elementLen = PACKET_DATA_SIZE;
569  }
570 
571  if (variableLength) {
572  /* Handle writing of variable length elements */
573  if (elementLen < 255) {
574  /*
575  0 1 2 3
576  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
577  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
578  | Length (< 255)| Information Element |
579  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
580  | ... continuing as needed |
581  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
582  */
583 
584  tmpl->buffer[tmpl->bufferSize + recordLen] = elementLen;
585 
586  /* record contains one byte of length information */
587  recordLen += 1;
588  } else {
589  /*
590  0 1 2 3
591  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
592  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
593  | 255 | Length (0 to 65535) | IE |
594  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
595  | ... continuing as needed |
596  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
597  */
598 
599  tmpl->buffer[tmpl->bufferSize + recordLen] = 255;
600  *((uint16_t *) &tmpl->buffer[tmpl->bufferSize + recordLen + 1]) = htons(elementLen);
601 
602  /* record contains three bytes of length information */
603  recordLen += 3;
604  }
605  }
606 
607  /* Fill the value to the buffer, let getter handle the byte ordering */
608  tmpGetter->filler(tmpGetter->self, record,
609  &tmpl->buffer[tmpl->bufferSize + recordLen], elementLen, 1);
610 
611  recordLen += elementLen;
612  } else {
613  MSG("Record added to invalid template\n");
614  }
615 
616  tmpGetter++;
617  tmpFileRecord++;
618  }
619 
620  /* Check that the packet size does not exceed the limit */
621  if (conf->templatesDataSize + recordLen + IPFIX_HEADER_SIZE > PACKET_DATA_SIZE) {
622  MSG("Buffered data exceeded maximum packet data size, need to export data\n");
623  return 1;
624  }
625 
626  conf->templatesDataSize += recordLen;
627  tmpl->bufferSize += recordLen;
628  tmpl->recordCount++;
629 
630  return 0;
631 }
632 
641 static int fei_fill_ipfix_header(char *ptr, uint16_t size, plugin_private_t *conf)
642 {
643  ipfix_header_t *header = (ipfix_header_t *)ptr;
644 
645  header->version = htons(IPFIX_VERISON);
646  header->length = htons(size);
647  header->exportTime = htonl(time(NULL));
648  header->sequenceNumber = htonl(conf->sequenceNum);
649  header->observationDomainId = htonl(conf->odid);
650 
651  return IPFIX_HEADER_SIZE;
652 }
653 
661 static int fei_fill_template_set_header(char *ptr, uint16_t size)
662 {
663 
665 
666  header->id = htons(TEMPLATE_SET_ID);
667  header->length = htons(size);
668 
669  return IPFIX_SET_HEADER_SIZE;
670 }
671 
678 static void fei_check_template_lifetime(template_t *tmpl, plugin_private_t *conf)
679 {
680  if (conf->templateRefreshTime != 0 &&
681  conf->templateRefreshTime + tmpl->exportTime <= time(NULL)) {
682  MSG("Template %i refresh time expired (%is)\n", tmpl->id, conf->templateRefreshTime);
683  tmpl->exported = 0;
684  }
685 
686  if (conf->templateRefreshPackets != 0 &&
687  conf->templateRefreshPackets + tmpl->exportPacket <= conf->exportedPackets) {
688  MSG("Template %i refresh packets expired (%i packets)\n", tmpl->id, conf->templateRefreshPackets);
689  tmpl->exported = 0;
690  }
691 }
692 
702 static uint16_t fei_create_template_packet(ipfix_packet_t *packet, plugin_private_t *conf)
703 {
704  template_t *tmp = conf->templates;
705  uint16_t totalSize = 0;
706  char *ptr;
707 
708  /* Get total size */
709  while (tmp != NULL) {
710  /* Check UDP template lifetime */
711  if (conf->protocol == IPPROTO_UDP) {
712  fei_check_template_lifetime(tmp, conf);
713  }
714  if (tmp->exported == 0) {
715  totalSize += tmp->templateSize;
716  }
717  tmp = tmp->next;
718  }
719 
720  /* Check that there are templates to export */
721  if (totalSize == 0) {
722  return 0;
723  }
724 
726 
727  /* Allocate memory for the packet */
728  packet->data = malloc(sizeof(char)*(totalSize));
729  if (!packet->data) {
730  return 0;
731  }
732  ptr = packet->data;
733 
734  /* Create ipfix message header */
735  ptr += fei_fill_ipfix_header(ptr, totalSize, conf);
736  /* Create template set header */
737  ptr += fei_fill_template_set_header(ptr, totalSize-IPFIX_HEADER_SIZE);
738 
739 
740  /* Copy the templates to the packet */
741  tmp = conf->templates;
742  while (tmp != NULL) {
743  if (tmp->exported == 0) {
744  memcpy(ptr, tmp->templateRecord, tmp->templateSize);
745  ptr += tmp->templateSize;
746  /* Set the templates as exported, store time and serial number */
747  tmp->exported = 1;
748  tmp->exportTime = time(NULL);
749  tmp->exportPacket = conf->exportedPackets;
750  }
751  tmp = tmp->next;
752  }
753 
754  packet->length = totalSize;
755  packet->flows = 0;
756 
757  return totalSize;
758 }
759 
760 
770 static uint16_t fei_create_data_packet(ipfix_packet_t *packet, plugin_private_t *conf)
771 {
772  template_t *tmp = conf->templates;
773  uint16_t totalSize = IPFIX_HEADER_SIZE; /* Include IPFIX header to total size */
774  uint32_t deltaSequenceNum = 0; /* Number of exported records in this packet */
775  static char buff[PACKET_DATA_SIZE];
776  char *ptr;
777 
778  /* Pointer to internal buffer is returned */
779  packet->data = buff;
780 
781  /* Start adding data after the header */
782  ptr = buff + totalSize;
783 
784  /* Copy the data sets to the packet */
785  conf->templatesDataSize = 0; /* Erase total data size */
786  while (tmp != NULL) {
787  /* Add only templates with data that fits to one packet */
788  if (tmp->recordCount > 0 && totalSize + tmp->bufferSize <= PACKET_DATA_SIZE) {
789  memcpy(ptr, tmp->buffer, tmp->bufferSize);
790  /* Set SET length */
791  ((ipfix_template_set_header_t *) ptr)->length = htons(tmp->bufferSize);
792  MSG("Adding template %i of length %i to data packet\n", tmp->id, tmp->bufferSize);
793  ptr += tmp->bufferSize;
794  /* Count size of the data copied to buffer */
795  totalSize += tmp->bufferSize;
796  /* Delete data from buffer */
798 
799  /* Store number of exported records */
800  deltaSequenceNum += tmp->recordCount;
801  tmp->recordCount = 0;
802  }
803  /* Update total data size, include empty template buffers (only set headers) */
804  conf->templatesDataSize += tmp->bufferSize;
805  tmp = tmp->next;
806  }
807 
808  /* Check that there are packets to export */
809  if (totalSize == IPFIX_HEADER_SIZE) {
810  return 0;
811  }
812 
813  /* Create ipfix message header at the beginning */
814  fei_fill_ipfix_header(buff, totalSize, conf);
815 
816  /* Fill number of flows and size of the packet */
817  packet->flows = deltaSequenceNum;
818  packet->length = totalSize;
819 
820  return totalSize;
821 }
822 
828 static void fei_expire_templates(plugin_private_t *conf)
829 {
830  template_t *tmp;
831  for (tmp = conf->templates; tmp != NULL; tmp = tmp->next) {
832  tmp->exported = 0;
833  if (conf->protocol == IPPROTO_UDP) {
834  tmp->exportTime = time(NULL);
835  tmp->exportPacket = conf->exportedPackets;
836  }
837  }
838 }
839 
845 static void fei_send_templates(plugin_private_t *conf)
846 {
847  ipfix_packet_t pkt;
848 
849  /* Send all new templates */
850  if (fei_create_template_packet(&pkt, conf)) {
851  /* Send template packet */
852  /* After error, the plugin sends all templates after reconnection,
853  * so we need not concern about it here */
854  fei_send_packet(&pkt, conf);
855 
856  free(pkt.data);
857  }
858 }
859 
865 static void fei_send_data(plugin_private_t *conf)
866 {
867  ipfix_packet_t pkt;
868 
869  /* Send all new templates */
870  if (fei_create_data_packet(&pkt, conf)) {
871  if (fei_send_packet(&pkt, conf) == 1) {
872  /* Collector reconnected, resend the packet */
873  fei_send_packet(&pkt, conf);
874  }
875  }
876 }
877 
887 static int fei_connect(plugin_private_t *conf)
888 {
889  struct addrinfo hints, *tmp;
890 
891  memset(&hints, 0, sizeof(hints));
892  hints.ai_family = conf->ip;
893  hints.ai_protocol = conf->protocol;
894  hints.ai_flags = AI_ADDRCONFIG | conf->flags;
895 
896  /* get server address */
897  if (getaddrinfo(conf->host, conf->port, &hints, &conf->addrinfo) != 0) {
898  perror("Cannot get server info");
899  return 1;
900  }
901 
902  /* Try addrinfo strucutres one by one */
903  for (tmp = conf->addrinfo; tmp != NULL; tmp = tmp->ai_next) {
904 
905  if (tmp->ai_family != AF_INET && tmp->ai_family != AF_INET6) {
906  continue;
907  }
908 
909  /* Print information about target address */
910  char buff[INET6_ADDRSTRLEN];
911  inet_ntop(tmp->ai_family,
912  (tmp->ai_family == AF_INET) ?
913  (void *) &((struct sockaddr_in *) tmp->ai_addr)->sin_addr :
914  (void *) &((struct sockaddr_in6 *) tmp->ai_addr)->sin6_addr,
915  (char *) &buff, sizeof(buff));
916 
917  printf("Connecting to IP %s\n", buff);
918 
919  MSG("Socket configuration: AI Family: %i, AI Socktype: %i, AI Protocol: %i\n",
920  tmp->ai_family, tmp->ai_socktype, tmp->ai_protocol);
921 
922  /* create socket */
923  conf->socket = socket(conf->addrinfo->ai_family,
924  conf->addrinfo->ai_socktype, conf->addrinfo->ai_protocol);
925  if (conf->socket == -1) {
926  perror("Cannot create new socket");
927  continue;
928  }
929 
930  /* connect to server with TCP and SCTP */
931  if (conf->protocol != IPPROTO_UDP &&
932  connect(conf->socket, conf->addrinfo->ai_addr, conf->addrinfo->ai_addrlen) == -1) {
933  perror("Cannot connect to collector");
934  close(conf->socket);
935  continue;
936  }
937 
938  /* Connected, meaningless for UDP */
939  if (conf->protocol != IPPROTO_UDP) {
940  printf("Successfully connected to collector\n");
941  }
942  break;
943  }
944 
945  /* Return error when all addrinfo structures were tried*/
946  if (tmp == NULL) {
947  /* Free allocated resources */
948  freeaddrinfo(conf->addrinfo);
949  return 1;
950  }
951 
952  return 0;
953 }
954 
961 static int fei_reconnect(plugin_private_t *conf)
962 {
963  /* Check for broken connection */
964  if (conf->lastReconnect != 0) {
965  /* Check whether we need to attempt reconnection */
966  if (conf->lastReconnect + conf->reconnectTimeout <= time(NULL)) {
967  /* Try to reconnect */
968  if (fei_connect(conf) == 0) {
969  conf->lastReconnect = 0;
970  /* Resend all templates */
971  fei_expire_templates(conf);
972  fei_send_templates(conf);
973  } else {
974  /* Set new reconnect time and drop packet */
975  conf->lastReconnect = time(NULL);
976  return 1;
977  }
978  } else {
979  /* Timeout not reached, drop packet */
980  return 1;
981  }
982  }
983 
984  return 0;
985 }
986 
996 static int fei_send_packet(ipfix_packet_t *packet, plugin_private_t *conf)
997 {
998  int ret; /* Return value of sendto */
999  int sent = 0; /* Sent data size */
1000 
1001  /* Check that connection is OK or drop packet */
1002  if (fei_reconnect(conf)) {
1003  return -1;
1004  }
1005 
1006  /* sendto() does not guarantee that everything will be send in one piece */
1007  while (sent < packet->length) {
1008  /* Send data to collector (TCP and SCTP ignores last two arguments) */
1009  ret = sendto(conf->socket, (void *) packet->data + sent, packet->length - sent, 0,
1010  conf->addrinfo->ai_addr, conf->addrinfo->ai_addrlen);
1011 
1012  /* Check that the data were sent correctly */
1013  if (ret == -1) {
1014  switch (errno) {
1015  case 0: break; /* OK */
1016  case ECONNRESET:
1017  case EINTR:
1018  case ENOTCONN:
1019  case ENOTSOCK:
1020  case EPIPE:
1021  case EHOSTUNREACH:
1022  case ENETDOWN:
1023  case ENETUNREACH:
1024  case ENOBUFS:
1025  case ENOMEM:
1026 
1027  /* The connection is broken */
1028  fprintf(stderr, "Collector closed connection\n");
1029 
1030  /* free resources */
1031  close(conf->socket);
1032  freeaddrinfo(conf->addrinfo);
1033 
1034  /* Set last connection try time so that we would reconnect immediatelly */
1035  conf->lastReconnect = 1;
1036 
1037  /* Reset the sequences number since it is unique per connection */
1038  conf->sequenceNum = 0;
1039  ((ipfix_header_t *) packet->data)->sequenceNumber = 0; /* no need to change byteorder of 0 */
1040 
1041  /* Say that we should try to connect and send data again */
1042  return 1;
1043  default:
1044  /* Unknown error */
1045  perror("Cannot send data to collector");
1046  return -1;
1047  }
1048  }
1049 
1050  /* No error from sendto(), add sent data count to total */
1051  sent += ret;
1052  }
1053 
1054  /* Update sequence number for next packet */
1055  conf->sequenceNum += packet->flows;
1056 
1057  /* Increase packet counter */
1058  conf->exportedPackets++;
1059 
1060  MSG("Packet (%li) sent to %s on port %s. Next sequence number is %i\n",
1061  conf->exportedPackets, conf->host, conf->port, conf->sequenceNum);
1062 
1063  return 0;
1064 }
1065 
1066 
1067 /*********** API FUNCTIONS ************/
1068 
1076 plugin_desc_t *plugin_export_desc()
1077 {
1078  return &plugin_desc;
1079 }
1080 
1081 
1091 void *plugin_export_init(char *params, flow_record_getter_t *getter_list)
1092 {
1093  plugin_private_t *conf;
1094 
1095  MSG("IPFIX export plugin init start\n");
1096 
1097  /* Allocate private configuration structure */
1098  conf = malloc(sizeof(plugin_private_t));
1099  if (!conf) {
1100  return NULL;
1101  }
1102 
1103  /* Init plugin configuration */
1104  fei_init_configuration(conf);
1105 
1106  /* Parse input parameters */
1107  if (fei_parse_params(params, conf)) {
1108  return NULL; /* TODO How to end correctly? */
1109  }
1110 
1111  /* Connect to collector */
1112  if (fei_connect(conf) != 0) {
1113  return NULL;
1114  }
1115 
1116  /* Load template file */
1117  if (fei_load_template_file(conf)) {
1118  return NULL;
1119  }
1120 
1121  /* Store getters that will be used */
1122  fei_store_getters(getter_list, conf);
1123 
1124  MSG("IPFIX export plugin init end\n");
1125 
1126  return conf;
1127 }
1128 
1129 
1139 int plugin_export_export(void *plugin_private, flow_record_t *record)
1140 {
1141  plugin_private_t *conf = (plugin_private_t *) plugin_private;
1142 
1143 #ifdef DEBUG
1144  flow_record_getter_t *tmp = conf->getters;
1145  unsigned char block[1024];
1146  int i;
1147 
1148  MSG("FLOW: %li, ", conf->counter++);
1149  while (tmp && tmp->name) {
1150  if (tmp->valid(tmp->self, record)) {
1151  int len = tmp->current_length(tmp->self, record);
1152  if (len > 1024) {
1153  len = 1024;
1154  }
1155  MSG("%s: ", tmp->name);
1156  tmp->filler(tmp->self, record, block, len, 0);
1157  for (i = 0; i < len; i++) {
1158  MSG("%02X", block[i]);
1159  }
1160  MSG(", ");
1161  }
1162  tmp++;
1163  }
1164  MSG("\n");
1165 #endif
1166 
1167  /* Get template for the record */
1168  template_t *tmpl = fei_find_template(record, conf);
1169  if (!tmpl) {
1170  MSG("Creating new template\n");
1171  tmpl = fei_create_template(record, conf);
1172  MSG("Template created\n");
1173  if (!tmpl) {
1174  MSG("Cannot create new template\n");
1175  }
1176  }
1177 
1178  MSG("Adding record to template\n");
1179  if (fei_add_record(record, tmpl, conf) == 1) {
1180  /* Flush data when record is too big to fit */
1181 
1182  /* Send all new templates */
1183  fei_send_templates(conf);
1184 
1185  /* Send the data packet */
1186  fei_send_data(conf);
1187 
1188  /* Try to add the record again */
1189  fei_add_record(record, tmpl, conf);
1190  }
1191 
1192  return 0;
1193 }
1194 
1203 int plugin_export_flush(void *plugin_private)
1204 {
1205  plugin_private_t *conf = (plugin_private_t *) plugin_private;
1206 
1207  MSG("plugin flush\n");
1208  fflush(stdout);
1209 
1210  /* Send all new templates */
1211  fei_send_templates(conf);
1212 
1213  /* Send the data packet */
1214  fei_send_data(conf);
1215 
1216  return 0;
1217 }
1218 
1219 /* TODO Callback for close function is missing.
1220  * It is not possible to withdraw templates, free addrinfo and close connection properly. */