flowmon-http-plugins  1.0
FlowMon HTTP Input/Process/export plugins
 All Data Structures Files Functions Variables Typedefs Macros
flowmon-export-http_postgres.c
Go to the documentation of this file.
1 
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <unistd.h>
42 #include <string.h>
43 #include <flowmonexp/plugin_export.h>
44 #include <libpq-fe.h>
45 #include <inttypes.h>
46 #include <time.h>
47 #include <stdbool.h>
48 
50 #define PRINTV(format, args...) do {if (VERBOSE) { fprintf(stdout, format, ##args); fflush(stdout); } } while (0)
51 
52 #define PRINTD(format, args...) do {if (DEBUG) { fprintf(stdout, format, ##args); fflush(stdout); } } while (0)
53 
54 #define PRINTA(format, args...) do {if (VERBOSE || DEBUG) { fprintf(stdout, format, ##args); fflush(stdout); } } while (0)
55 
56 #define MAX_CONNINFO_LENGTH 2000
57 #define VALUES_SIZE 600
58 
62 SET_PLUGIN_TYPE(PLUGIN_TYPE_EXPORT);
63 
65 extern int verbose_level;
66 
68 extern int debug_level;
69 
71 bool VERBOSE;
72 
74 bool DEBUG;
75 
79 typedef struct {
81  int counter;
82 
84  flow_record_getter_t *getters;
85 
87  char *conninfo;
88 
90  PGconn *conn;
91 
93  PGresult *res;
94 
96  char *buffer;
97 
100 
103 
105  struct timespec lastinsert;
106 
109 
112 
114  char *dbname;
115 
118 
121 
123 
127 static plugin_desc_t pcap_desc = {
128  "export-http_postgres",
129  "Plugin exports flow to remote postgres database"
130  "Parameters :\n"
131  "You can find more detailed description at\n"
132  "http://www.postgresql.org/docs/8.4/static/libpq-connect.html\n"
133  " host=postgres.localnetwork.com <- hostname of server to connect to\n"
134  " port=5432 port to connect to\n"
135  " dbname=my_database name of database\n"
136  " user=username username\n"
137  " password=secret_pass password\n"
138  " connect_timeout=10 timeout of connection specified in seconds\n"
139  " hostaddr=192.168.1.10 server address, can be used instead of 'host' parameter\n"
140  " sslcert=path/cert filename of client SSL certificate\n"
141  " sslmode=allow This option determines whether or with what priority a SSL TCP/IP connection will be negotiated with the server.\n"
142  " sslkey=/path/key This parameter specifies the location for the secret key used for the client certificate\n"
143  " sslrootcert=/path/cert specifies the file name of the root SSL certificate\n"
144  " sslcrl=/path/crl specifies the file name of the SSL certificate revocation list\n"
145  " verbose=y/n switch on/off verbose mode (default: same as flowmonexp)\n"
146  " debug=y/n switch on/off verbose mode (default: same as flowmonexp)\n"
147  " insert_len=num how many records to insert with each insert command, greater insert_len -> less insert commands -> faster inserting to database (default 10000)\n"
148  " insert_time=X if X seconds passed since last insert into database, insert into database after processing next packet (default 60)\n"
149  " export_all=y insert also flows that aren't HTTP into database",
150  0,
151  0
152 };
153 
161 int parse_conninfo(char *params, plugin_private_export *conf)
162 {
163  char *token, *value, *param;
164  conf->conninfo = malloc(3);
165  conf->conninfo[0] = '\0';
166  uint8_t state = 0; /* state 0: expect param, state 1: expect value */
167  /* Check that there are parameters */
168  if (params == NULL) {
169  fprintf(stderr, "input-http-flow need parameters\n");
170  return 1;
171  }
172 
173  /* Parse parameter string */
174  token = strtok(params, ",=");
175  while (token != NULL) {
176  if (state == 0) {
177  param = token;
178  } else {
179  value = token;
180  PRINTD("param: %s\t", param);
181  PRINTD("value: %s\n", value);
182  conf->conninfo =
183  realloc(conf->conninfo,
184  strlen(conf->conninfo) + strlen(param) +
185  strlen(value) + 10);
186  if (strcmp("host", param) == 0) {
187  strcat(conf->conninfo, " host = ");
188  strcat(conf->conninfo, value);
189  } else if (strcmp("port", param) == 0) {
190  strcat(conf->conninfo, " port = ");
191  strcat(conf->conninfo, value);
192  } else if (strcmp("dbname", param) == 0) {
193  strcat(conf->conninfo, " dbname = ");
194  strcat(conf->conninfo, value);
195  conf->dbname = malloc(strlen(value));
196  strcpy(conf->dbname, value);
197  } else if (strcmp("user", param) == 0) {
198  strcat(conf->conninfo, " user = ");
199  strcat(conf->conninfo, value);
200  } else if (strcmp("password", param) == 0) {
201 
202  strcat(conf->conninfo, " password = ");
203  strcat(conf->conninfo, value);
204  } else if (strcmp("connect_timeout", param) == 0) {
205  strcat(conf->conninfo, " connect_timeout = ");
206  strcat(conf->conninfo, value);
207  } else if (strcmp("hostaddr", param) == 0) {
208  strcat(conf->conninfo, " hostaddr = ");
209  strcat(conf->conninfo, value);
210  } else if (strcmp("sslcert", param) == 0) {
211  strcat(conf->conninfo, " sslcert = ");
212  strcat(conf->conninfo, value);
213  } else if (strcmp("sslmode", param) == 0) {
214  strcat(conf->conninfo, " sslmode = ");
215  strcat(conf->conninfo, value);
216  } else if (strcmp("sslkey", param) == 0) {
217  strcat(conf->conninfo, " sslkey = ");
218  strcat(conf->conninfo, value);
219  } else if (strcmp("sslrootcert", param) == 0) {
220  strcat(conf->conninfo, " sslrootcert = ");
221  strcat(conf->conninfo, value);
222  } else if (strcmp("sslcrl", param) == 0) {
223  strcat(conf->conninfo, " sslcrl = ");
224  strcat(conf->conninfo, value);
225  } else if (strcmp("verbose", param) == 0) {
226  if (strcmp(value, "y") == 0) {
227  VERBOSE = true;
228  } else if (strcmp(value, "n") == 0) {
229  VERBOSE = false;
230  } else {
231  fprintf(stderr,
232  " value of 'verbose' parameter is not valid");
233  }
234  } else if (strcmp("debug", param) == 0) {
235  if (strcmp(value, "y") == 0) {
236  DEBUG = true;
237  } else if (strcmp(value, "n") == 0) {
238  DEBUG = false;
239  } else {
240  fprintf(stderr,
241  " value of 'debug' parameter is not valid");
242  }
243  } else if (strcmp("insert_len", param) == 0) {
244  conf->buffer_length = atoi(value) + 1;
245  } else if (strcmp("insert_time", param) == 0) {
246  conf->insert_tick = atoi(value);
247  } else if (strcmp("export_all", param) == 0) {
248  if (strcmp(value, "y") == 0) {
249  conf->export_all = true;
250  }
251  } else {
252  fprintf(stderr, "Unknown parameter '%s'\n",
253  param);
254  return 1;
255  }
256  }
257 
258  /* Move to next token */
259  token = strtok(NULL, ",=");
260  state = 1 - state;
261  }
262  PRINTD("CONNINFO : %s\n", conf->conninfo);
263  return 0;
264 }
265 
272 static void exit_nicely(PGconn *conn)
273 {
274  PQfinish(conn);
275  exit(1);
276 }
277 
285 {
286  p->conn = PQconnectdb(p->conninfo);
287  /* init connection */
288  if (!p->conn) {
289  fprintf(stderr,
290  "PQconnectdb returned NULL. Can't allocate memory.\n\n");
291  return 1;
292  }
293  if (PQstatus(p->conn) != CONNECTION_OK) {
294  fprintf(stderr, "Connection failed to establish : %s\n\n",
295  PQerrorMessage(p->conn));
296  return 2;
297  }
298  return 0;
299 }
300 
305 PLUGIN_EXPORT_DESC {
306  return &pcap_desc;
307 }
308 
316 void *plugin_export_init(char *params, flow_record_getter_t *getter_list)
317 {
318  if (verbose_level >= 0) {
319  VERBOSE = true;
320  } else {
321  VERBOSE = false;
322  }
323  if (debug_level >= 0) {
324  DEBUG = true;
325  } else {
326  DEBUG = false;
327  }
328  PRINTA("plugin export init start\n");
329  char tablecheck[200];
330  plugin_private_export *retval;
331  retval = malloc(sizeof(plugin_private_export));
332  if (!retval) {
333  return NULL;
334  }
335  flow_record_getter_t *tmp = getter_list;
336  retval->conninfo = NULL;
337  retval->buffer_length = 10000;
338  retval->insert_tick = 60;
339  retval->export_all = false;
340  retval->empty_insert = true;
341  parse_conninfo(params, retval);
342  snprintf(tablecheck, sizeof(tablecheck) / sizeof(char),
343  "SELECT 1 FROM information_schema.tables WHERE table_catalog='%s' AND table_schema='public' AND table_name='flows';",
344  retval->dbname);
345  retval->getters = NULL;
346  retval->counter = 0;
347  while (tmp && tmp->name) {
348  PRINTA("getter: %s - %i\n", tmp->name, tmp->length);
349  getter_copy_to(&retval->getters, tmp);
350  tmp++;
351  }
352  if (SQLconnection(retval) != 0) {
353  fprintf(stderr, "CONNECTION FAILED\n");
354  exit(3);
355  } else {
356  PRINTA("CONNECTION ESTABILISHED\n");
357  }
358 
359  retval->res = PQexec(retval->conn, tablecheck);
360  if (PQresultStatus(retval->res) != PGRES_TUPLES_OK) {
361 
362  fprintf(stderr,
363  "Failed to check if table for flow data exists : %s\n",
364  PQerrorMessage(retval->conn));
365  PQclear(retval->res);
366  exit_nicely(retval->conn);
367  }
368  if (PQntuples(retval->res) == 0) {
369  PRINTA("Creating table for flow data...\n");
370 
371  retval->res = PQexec(retval->conn, "CREATE TABLE flows\n"
372  "(\n"
373  " \"SOURCE_IP\" inet NOT NULL,\n"
374  " \"L4_PORT_SRC\" integer NOT NULL,\n"
375  " \"DESTINATION_IP\" inet NOT NULL,\n"
376  " \"L4_PORT_DST\" integer NOT NULL,\n"
377  " \"FLOW_START_USEC\" timestamptz NOT NULL,\n"
378  " \"FLOW_END_USEC\" timestamptz NOT NULL,\n"
379  " \"L3_PROTO\" smallint NOT NULL,\n"
380  " \"PACKETS\" bigint NOT NULL,\n"
381  " \"BYTES\" bigint NOT NULL,\n"
382  " \"HTTP_CONTENT_TYPE\" smallint,\n"
383  " \"HTTP_USERAGENT\" smallint,\n"
384  " \"HTTP_METHOD\" smallint,\n"
385  " \"HTTP_DOMAIN\" character(32),\n"
386  " \"HTTP_URL\" character(32),\n"
387  " \"HTTP_REFERER\" character(32),\n"
388  " CONSTRAINT flows_pkey PRIMARY KEY (\"DESTINATION_IP\" , \"SOURCE_IP\" , \"L4_PORT_SRC\" , \"L4_PORT_DST\" , \"FLOW_START_USEC\" , \"L3_PROTO\" )\n"
389  ")\n"
390  "WITH (\n"
391  " OIDS=FALSE\n"
392  ");\n"
393  "ALTER TABLE flows\n"
394  " OWNER TO exporter;\n"
395  "COMMENT ON TABLE flows\n"
396  " IS 'Prvni verze databaze pro HTTP export';");
397  if (PQresultStatus(retval->res) != PGRES_COMMAND_OK) {
398 
399  fprintf(stderr,
400  "Failed to create table for flow data : %s\n",
401  PQerrorMessage(retval->conn));
402  PQclear(retval->res);
403  exit_nicely(retval->conn);
404  }
405 
406  }
407  retval->buffer = malloc(VALUES_SIZE * sizeof(char) * retval->buffer_length + 500 * sizeof(char)); /* size of each tuple after insert + size of inser command */
408  retval->buffer_length =
409  VALUES_SIZE * sizeof(char) * retval->buffer_length +
410  500 * sizeof(char);
411  retval->buffer_used = 0;
412  PRINTA("plugin export init end\n");
413  return retval;
414 }
415 
423 void send_insert(plugin_private_export *p, struct timespec current_time)
424 {
425  strcat(p->buffer, ";");
426  PRINTA("SENDING INSERT COMMAND :\n %s\n", p->buffer);
427  p->res = PQexec(p->conn, p->buffer);
428  if (PQresultStatus(p->res) != PGRES_COMMAND_OK) {
429 
430  fprintf(stderr, "Insert command failed:%s\n",
431  PQerrorMessage(p->conn));
432  PQclear(p->res);
433  exit_nicely(p->conn);
434  }
435  PQclear(p->res);
436 
437  p->buffer_used = 0;
438  p->buffer[0] = 0;
439  p->lastinsert = current_time;
440  p->empty_insert = true;
441 }
442 
450 int plugin_export_export(void *plugin_private, flow_record_t *record)
451 {
452  plugin_private_export *p = (plugin_private_export *) plugin_private;
453  flow_record_getter_t *tmp = p->getters;
454  int len;
455  bool is_http = false;
456  uint8_t dest_ipv4[4];
457  uint8_t source_ipv4[4];
458  uint16_t dest_ipv6[8];
459  uint16_t source_ipv6[8];
460  memset(dest_ipv4, 0, sizeof(dest_ipv4));
461  memset(source_ipv4, 0, sizeof(source_ipv4));
462  memset(dest_ipv6, 0, sizeof(dest_ipv6));
463  memset(source_ipv6, 0, sizeof(source_ipv6));
464  uint16_t source_port = 0;
465  uint16_t dest_port = 0;
466  uint64_t time_start = 0;
467  uint64_t time_end = 0;
468  uint8_t protocol_ip = 0;
469  uint8_t useragent = 0;
470  uint8_t method = 0;
471  uint64_t packets = 0;
472  uint64_t bytes = 0;
473  char domain[33];
474  char referer[33];
475  uint8_t content_type = 0;
476  char url[33]; /* always long enough because of internal restrictions of flowmon, sizeof(getter)<=32*sizeof(char) */
477  domain[0] = referer[0] = url[0] = 0;
478  char insert[] = "INSERT INTO flows(\n"
479  " \"SOURCE_IP\", \"L4_PORT_SRC\", \"DESTINATION_IP\", \"L4_PORT_DST\",\n"
480  " \"L3_PROTO\" , \"FLOW_START_USEC\", \"FLOW_END_USEC\", \"PACKETS\", \"BYTES\",\"HTTP_CONTENT_TYPE\", \"HTTP_USERAGENT\", \"HTTP_METHOD\",\n"
481  "\"HTTP_DOMAIN\", \"HTTP_URL\", \"HTTP_REFERER\")\n" "VALUES";
482  char values[VALUES_SIZE];
483 
484  /* buffer is filled with mess, we are in first packet or after insert */
485 
486  if (p->buffer_used == 0) {
487  strcpy(p->buffer, insert);
488  p->buffer_used += sizeof(insert) / sizeof(char);
489  clock_gettime(CLOCK_REALTIME, &(p->lastinsert));
490  p->first_record = true;
491  }
492 
493  /* check connection */
494 
495  if (PQstatus(p->conn) != CONNECTION_OK) {
496  fprintf(stderr, "Connection droppped : %s\n",
497  PQerrorMessage(p->conn));
498  while (PQstatus(p->conn) != CONNECTION_OK) {
499  PRINTD("Trying to reconnect...\n");
500  if (SQLconnection(p) != 0) {
501  fprintf(stderr, "Reconnect failed : %s\n",
502  PQerrorMessage(p->conn));
503  }
504  }
505  PRINTA("Reconnected\n");
506  }
507 
508  /* parse getters */
509 
510  while (tmp && tmp->name) {
511  if (tmp->valid(tmp->self, record)) {
512  len = tmp->current_length(tmp->self, record);
513  if (strcmp(tmp->name, "L3_IPV4_ADDR_DST") == 0) {
514  tmp->filler(tmp->self, record, dest_ipv4, len,
515  1);
516  protocol_ip = 4;
517  } else if (strcmp(tmp->name, "L3_IPV4_ADDR_SRC") == 0) {
518  tmp->filler(tmp->self, record, source_ipv4, len,
519  1);
520  protocol_ip = 4;
521  } else if (strcmp(tmp->name, "L3_IPV6_ADDR_DST") == 0) {
522  tmp->filler(tmp->self, record, dest_ipv6, len,
523  0);
524  protocol_ip = 6;
525  } else if (strcmp(tmp->name, "L3_IPV6_ADDR_SRC") == 0) {
526  tmp->filler(tmp->self, record, source_ipv6, len,
527  0);
528  protocol_ip = 6;
529  } else if (strcmp(tmp->name, "L4_PORT_SRC") == 0) {
530  tmp->filler(tmp->self, record, &source_port,
531  len, 0);
532  } else if (strcmp(tmp->name, "L4_PORT_DST") == 0) {
533  tmp->filler(tmp->self, record, &dest_port, len,
534  0);
535  } else if (strcmp(tmp->name, "PACKETS") == 0) {
536  tmp->filler(tmp->self, record, &packets, len,
537  0);
538  } else if (strcmp(tmp->name, "BYTES") == 0) {
539  tmp->filler(tmp->self, record, &bytes, len, 0);
540  } else if (strcmp(tmp->name, "FLOW_START_USEC") == 0) {
541  tmp->filler(tmp->self, record, &time_start, len,
542  0);
543  } else if (strcmp(tmp->name, "FLOW_END_USEC") == 0) {
544  tmp->filler(tmp->self, record, &time_end, len,
545  0);
546  } else if (strcmp(tmp->name, "HTTP_USERAGENT") == 0) {
547  tmp->filler(tmp->self, record, &useragent, len,
548  0);
549  is_http = true;
550  } else if (strcmp(tmp->name, "HTTP_METHOD") == 0) {
551  tmp->filler(tmp->self, record, &method, len, 0);
552  is_http = true;
553  } else if (strcmp(tmp->name, "HTTP_DOMAIN") == 0) {
554  tmp->filler(tmp->self, record, domain, len, 0);
555  is_http = true;
556  } else if (strcmp(tmp->name, "HTTP_REFERER") == 0) {
557  tmp->filler(tmp->self, record, referer, len, 0);
558  is_http = true;
559  } else if (strcmp(tmp->name, "HTTP_CONTENT_TYPE") == 0) {
560  tmp->filler(tmp->self, record, &content_type,
561  len, 0);
562  is_http = true;
563  } else if (strcmp(tmp->name, "HTTP_URL") == 0) {
564  tmp->filler(tmp->self, record, url, len, 0);
565  is_http = true;
566  }
567  }
568  tmp++;
569  }
570  if (!is_http && !p->export_all) {
571  return 0;
572  }
573 
574  if (!p->first_record) {
575  strcat(p->buffer, ",");
576  p->buffer_used++;
577  } else {
578  p->first_record = false;
579  }
580 
581  if (protocol_ip == 4) {
582  snprintf(values, sizeof(values),
583  "( '%d.%d.%d.%d' , %d , '%d.%d.%d.%d' , %d, %d, to_timestamp('%"
584  PRIu64 ".%06" PRIu64 "'), to_timestamp('%" PRIu64
585  ".%06" PRIu64 "'), %" PRIu64 ", %" PRIu64
586  ", %d, %d, %d,'%s', '%s', '%s')", source_ipv4[0],
587  source_ipv4[1], source_ipv4[2], source_ipv4[3],
588  source_port, dest_ipv4[0], dest_ipv4[1], dest_ipv4[2],
589  dest_ipv4[3], dest_port, protocol_ip,
590  time_start / 1000000, time_start % 1000000,
591  time_end / 1000000, time_end % 1000000, packets, bytes,
592  content_type, useragent, method, domain, url, referer);
593  } else if (protocol_ip == 6) {
594  snprintf(values, sizeof(values),
595  "('%X:%X:%X:%X:%X:%X:%X:%X' ,%d, '%X:%X:%X:%X:%X:%X:%X:%X', %d, %d, to_timestamp('%"
596  PRIu64 ".%06" PRIu64 "'), to_timestamp('%" PRIu64
597  ".%06" PRIu64 "'), %" PRIu64 ", %" PRIu64
598  ", %d, %d, %d, '%s', '%s', '%s')", source_ipv6[7],
599  source_ipv6[6], source_ipv6[5], source_ipv6[4],
600  source_ipv6[3], source_ipv6[2], source_ipv6[1],
601  source_ipv6[0], source_port, dest_ipv6[7],
602  dest_ipv6[6], dest_ipv6[5], dest_ipv6[4], dest_ipv6[3],
603  dest_ipv6[2], dest_ipv6[1], dest_ipv6[0], dest_port,
604  protocol_ip, time_start / 1000000,
605  time_start % 1000000, time_end / 1000000,
606  time_end % 1000000, packets, bytes, content_type,
607  useragent, method, domain, url, referer);
608  } else {
609  return 5;
610  }
611 
612  strcat(p->buffer, values);
613  p->buffer_used += sizeof(values);
614  p->empty_insert = false;
615  struct timespec current_time;
616  clock_gettime(CLOCK_REALTIME, &current_time);
617  if ((p->buffer_length) - (p->buffer_used) <= 2 * (sizeof(values))
618  || current_time.tv_sec - (p->lastinsert).tv_sec >= p->insert_tick) {
619  send_insert(p, current_time);
620  }
621  p->counter++;
622  fflush(stdout);
623  return 0;
624 }
625 
633 int plugin_export_flush(void *plugin_private)
634 {
635  PRINTD("plugin flush start\n");
636  plugin_private_export *p = (plugin_private_export *) plugin_private;
637  if (!p->empty_insert) {
638  struct timespec current_time;
639  clock_gettime(CLOCK_REALTIME, &current_time);
640  send_insert(p, current_time);
641  }
642  PRINTD("plugin flush end\n");
643  return 0;
644 }