43 #include <flowmonexp/plugin_export.h>
50 #define PRINTV(format, args...) do {if (VERBOSE) { fprintf(stdout, format, ##args); fflush(stdout); } } while (0)
52 #define PRINTD(format, args...) do {if (DEBUG) { fprintf(stdout, format, ##args); fflush(stdout); } } while (0)
54 #define PRINTA(format, args...) do {if (VERBOSE || DEBUG) { fprintf(stdout, format, ##args); fflush(stdout); } } while (0)
56 #define MAX_CONNINFO_LENGTH 2000
57 #define VALUES_SIZE 600
105 struct timespec lastinsert;
127 static plugin_desc_t pcap_desc = {
128 "export-http_postgres",
129 "Plugin exports flow to remote postgres database"
131 "You can find more detailed description at\n"
132 "http://www.postgresql.org/docs/8.4/static/libpq-connect.html\n"
133 " host=postgres.localnetwork.com <- hostname of server to connect to\n"
134 " port=5432 port to connect to\n"
135 " dbname=my_database name of database\n"
136 " user=username username\n"
137 " password=secret_pass password\n"
138 " connect_timeout=10 timeout of connection specified in seconds\n"
139 " hostaddr=192.168.1.10 server address, can be used instead of 'host' parameter\n"
140 " sslcert=path/cert filename of client SSL certificate\n"
141 " sslmode=allow This option determines whether or with what priority a SSL TCP/IP connection will be negotiated with the server.\n"
142 " sslkey=/path/key This parameter specifies the location for the secret key used for the client certificate\n"
143 " sslrootcert=/path/cert specifies the file name of the root SSL certificate\n"
144 " sslcrl=/path/crl specifies the file name of the SSL certificate revocation list\n"
145 " verbose=y/n switch on/off verbose mode (default: same as flowmonexp)\n"
146 " debug=y/n switch on/off verbose mode (default: same as flowmonexp)\n"
147 " insert_len=num how many records to insert with each insert command, greater insert_len -> less insert commands -> faster inserting to database (default 10000)\n"
148 " insert_time=X if X seconds passed since last insert into database, insert into database after processing next packet (default 60)\n"
149 " export_all=y insert also flows that aren't HTTP into database",
163 char *token, *value, *param;
168 if (params == NULL) {
169 fprintf(stderr,
"input-http-flow need parameters\n");
174 token = strtok(params,
",=");
175 while (token != NULL) {
180 PRINTD(
"param: %s\t", param);
181 PRINTD(
"value: %s\n", value);
184 strlen(conf->
conninfo) + strlen(param) +
186 if (strcmp(
"host", param) == 0) {
189 }
else if (strcmp(
"port", param) == 0) {
192 }
else if (strcmp(
"dbname", param) == 0) {
193 strcat(conf->
conninfo,
" dbname = ");
195 conf->
dbname = malloc(strlen(value));
196 strcpy(conf->
dbname, value);
197 }
else if (strcmp(
"user", param) == 0) {
200 }
else if (strcmp(
"password", param) == 0) {
202 strcat(conf->
conninfo,
" password = ");
204 }
else if (strcmp(
"connect_timeout", param) == 0) {
205 strcat(conf->
conninfo,
" connect_timeout = ");
207 }
else if (strcmp(
"hostaddr", param) == 0) {
208 strcat(conf->
conninfo,
" hostaddr = ");
210 }
else if (strcmp(
"sslcert", param) == 0) {
211 strcat(conf->
conninfo,
" sslcert = ");
213 }
else if (strcmp(
"sslmode", param) == 0) {
214 strcat(conf->
conninfo,
" sslmode = ");
216 }
else if (strcmp(
"sslkey", param) == 0) {
217 strcat(conf->
conninfo,
" sslkey = ");
219 }
else if (strcmp(
"sslrootcert", param) == 0) {
220 strcat(conf->
conninfo,
" sslrootcert = ");
222 }
else if (strcmp(
"sslcrl", param) == 0) {
223 strcat(conf->
conninfo,
" sslcrl = ");
225 }
else if (strcmp(
"verbose", param) == 0) {
226 if (strcmp(value,
"y") == 0) {
228 }
else if (strcmp(value,
"n") == 0) {
232 " value of 'verbose' parameter is not valid");
234 }
else if (strcmp(
"debug", param) == 0) {
235 if (strcmp(value,
"y") == 0) {
237 }
else if (strcmp(value,
"n") == 0) {
241 " value of 'debug' parameter is not valid");
243 }
else if (strcmp(
"insert_len", param) == 0) {
245 }
else if (strcmp(
"insert_time", param) == 0) {
247 }
else if (strcmp(
"export_all", param) == 0) {
248 if (strcmp(value,
"y") == 0) {
252 fprintf(stderr,
"Unknown parameter '%s'\n",
259 token = strtok(NULL,
",=");
272 static void exit_nicely(PGconn *conn)
290 "PQconnectdb returned NULL. Can't allocate memory.\n\n");
293 if (PQstatus(p->
conn) != CONNECTION_OK) {
294 fprintf(stderr,
"Connection failed to establish : %s\n\n",
295 PQerrorMessage(p->
conn));
316 void *plugin_export_init(
char *params, flow_record_getter_t *getter_list)
318 if (verbose_level >= 0) {
323 if (debug_level >= 0) {
328 PRINTA(
"plugin export init start\n");
329 char tablecheck[200];
335 flow_record_getter_t *tmp = getter_list;
342 snprintf(tablecheck,
sizeof(tablecheck) /
sizeof(
char),
343 "SELECT 1 FROM information_schema.tables WHERE table_catalog='%s' AND table_schema='public' AND table_name='flows';",
347 while (tmp && tmp->name) {
348 PRINTA(
"getter: %s - %i\n", tmp->name, tmp->length);
349 getter_copy_to(&retval->
getters, tmp);
353 fprintf(stderr,
"CONNECTION FAILED\n");
356 PRINTA(
"CONNECTION ESTABILISHED\n");
359 retval->
res = PQexec(retval->
conn, tablecheck);
360 if (PQresultStatus(retval->
res) != PGRES_TUPLES_OK) {
363 "Failed to check if table for flow data exists : %s\n",
364 PQerrorMessage(retval->
conn));
365 PQclear(retval->
res);
366 exit_nicely(retval->
conn);
368 if (PQntuples(retval->
res) == 0) {
369 PRINTA(
"Creating table for flow data...\n");
371 retval->
res = PQexec(retval->
conn,
"CREATE TABLE flows\n"
373 " \"SOURCE_IP\" inet NOT NULL,\n"
374 " \"L4_PORT_SRC\" integer NOT NULL,\n"
375 " \"DESTINATION_IP\" inet NOT NULL,\n"
376 " \"L4_PORT_DST\" integer NOT NULL,\n"
377 " \"FLOW_START_USEC\" timestamptz NOT NULL,\n"
378 " \"FLOW_END_USEC\" timestamptz NOT NULL,\n"
379 " \"L3_PROTO\" smallint NOT NULL,\n"
380 " \"PACKETS\" bigint NOT NULL,\n"
381 " \"BYTES\" bigint NOT NULL,\n"
382 " \"HTTP_CONTENT_TYPE\" smallint,\n"
383 " \"HTTP_USERAGENT\" smallint,\n"
384 " \"HTTP_METHOD\" smallint,\n"
385 " \"HTTP_DOMAIN\" character(32),\n"
386 " \"HTTP_URL\" character(32),\n"
387 " \"HTTP_REFERER\" character(32),\n"
388 " CONSTRAINT flows_pkey PRIMARY KEY (\"DESTINATION_IP\" , \"SOURCE_IP\" , \"L4_PORT_SRC\" , \"L4_PORT_DST\" , \"FLOW_START_USEC\" , \"L3_PROTO\" )\n"
393 "ALTER TABLE flows\n"
394 " OWNER TO exporter;\n"
395 "COMMENT ON TABLE flows\n"
396 " IS 'Prvni verze databaze pro HTTP export';");
397 if (PQresultStatus(retval->
res) != PGRES_COMMAND_OK) {
400 "Failed to create table for flow data : %s\n",
401 PQerrorMessage(retval->
conn));
402 PQclear(retval->
res);
403 exit_nicely(retval->
conn);
412 PRINTA(
"plugin export init end\n");
428 if (PQresultStatus(p->
res) != PGRES_COMMAND_OK) {
430 fprintf(stderr,
"Insert command failed:%s\n",
431 PQerrorMessage(p->
conn));
433 exit_nicely(p->
conn);
450 int plugin_export_export(
void *plugin_private, flow_record_t *record)
453 flow_record_getter_t *tmp = p->
getters;
455 bool is_http =
false;
456 uint8_t dest_ipv4[4];
457 uint8_t source_ipv4[4];
458 uint16_t dest_ipv6[8];
459 uint16_t source_ipv6[8];
460 memset(dest_ipv4, 0,
sizeof(dest_ipv4));
461 memset(source_ipv4, 0,
sizeof(source_ipv4));
462 memset(dest_ipv6, 0,
sizeof(dest_ipv6));
463 memset(source_ipv6, 0,
sizeof(source_ipv6));
464 uint16_t source_port = 0;
465 uint16_t dest_port = 0;
466 uint64_t time_start = 0;
467 uint64_t time_end = 0;
468 uint8_t protocol_ip = 0;
469 uint8_t useragent = 0;
471 uint64_t packets = 0;
475 uint8_t content_type = 0;
477 domain[0] = referer[0] = url[0] = 0;
478 char insert[] =
"INSERT INTO flows(\n"
479 " \"SOURCE_IP\", \"L4_PORT_SRC\", \"DESTINATION_IP\", \"L4_PORT_DST\",\n"
480 " \"L3_PROTO\" , \"FLOW_START_USEC\", \"FLOW_END_USEC\", \"PACKETS\", \"BYTES\",\"HTTP_CONTENT_TYPE\", \"HTTP_USERAGENT\", \"HTTP_METHOD\",\n"
481 "\"HTTP_DOMAIN\", \"HTTP_URL\", \"HTTP_REFERER\")\n" "VALUES";
487 strcpy(p->
buffer, insert);
489 clock_gettime(CLOCK_REALTIME, &(p->
lastinsert));
495 if (PQstatus(p->
conn) != CONNECTION_OK) {
496 fprintf(stderr,
"Connection droppped : %s\n",
497 PQerrorMessage(p->
conn));
498 while (PQstatus(p->
conn) != CONNECTION_OK) {
499 PRINTD(
"Trying to reconnect...\n");
501 fprintf(stderr,
"Reconnect failed : %s\n",
502 PQerrorMessage(p->
conn));
510 while (tmp && tmp->name) {
511 if (tmp->valid(tmp->self, record)) {
512 len = tmp->current_length(tmp->self, record);
513 if (strcmp(tmp->name,
"L3_IPV4_ADDR_DST") == 0) {
514 tmp->filler(tmp->self, record, dest_ipv4, len,
517 }
else if (strcmp(tmp->name,
"L3_IPV4_ADDR_SRC") == 0) {
518 tmp->filler(tmp->self, record, source_ipv4, len,
521 }
else if (strcmp(tmp->name,
"L3_IPV6_ADDR_DST") == 0) {
522 tmp->filler(tmp->self, record, dest_ipv6, len,
525 }
else if (strcmp(tmp->name,
"L3_IPV6_ADDR_SRC") == 0) {
526 tmp->filler(tmp->self, record, source_ipv6, len,
529 }
else if (strcmp(tmp->name,
"L4_PORT_SRC") == 0) {
530 tmp->filler(tmp->self, record, &source_port,
532 }
else if (strcmp(tmp->name,
"L4_PORT_DST") == 0) {
533 tmp->filler(tmp->self, record, &dest_port, len,
535 }
else if (strcmp(tmp->name,
"PACKETS") == 0) {
536 tmp->filler(tmp->self, record, &packets, len,
538 }
else if (strcmp(tmp->name,
"BYTES") == 0) {
539 tmp->filler(tmp->self, record, &bytes, len, 0);
540 }
else if (strcmp(tmp->name,
"FLOW_START_USEC") == 0) {
541 tmp->filler(tmp->self, record, &time_start, len,
543 }
else if (strcmp(tmp->name,
"FLOW_END_USEC") == 0) {
544 tmp->filler(tmp->self, record, &time_end, len,
546 }
else if (strcmp(tmp->name,
"HTTP_USERAGENT") == 0) {
547 tmp->filler(tmp->self, record, &useragent, len,
550 }
else if (strcmp(tmp->name,
"HTTP_METHOD") == 0) {
551 tmp->filler(tmp->self, record, &method, len, 0);
553 }
else if (strcmp(tmp->name,
"HTTP_DOMAIN") == 0) {
554 tmp->filler(tmp->self, record, domain, len, 0);
556 }
else if (strcmp(tmp->name,
"HTTP_REFERER") == 0) {
557 tmp->filler(tmp->self, record, referer, len, 0);
559 }
else if (strcmp(tmp->name,
"HTTP_CONTENT_TYPE") == 0) {
560 tmp->filler(tmp->self, record, &content_type,
563 }
else if (strcmp(tmp->name,
"HTTP_URL") == 0) {
564 tmp->filler(tmp->self, record, url, len, 0);
581 if (protocol_ip == 4) {
582 snprintf(values,
sizeof(values),
583 "( '%d.%d.%d.%d' , %d , '%d.%d.%d.%d' , %d, %d, to_timestamp('%"
584 PRIu64
".%06" PRIu64
"'), to_timestamp('%" PRIu64
585 ".%06" PRIu64
"'), %" PRIu64
", %" PRIu64
586 ", %d, %d, %d,'%s', '%s', '%s')", source_ipv4[0],
587 source_ipv4[1], source_ipv4[2], source_ipv4[3],
588 source_port, dest_ipv4[0], dest_ipv4[1], dest_ipv4[2],
589 dest_ipv4[3], dest_port, protocol_ip,
590 time_start / 1000000, time_start % 1000000,
591 time_end / 1000000, time_end % 1000000, packets, bytes,
592 content_type, useragent, method, domain, url, referer);
593 }
else if (protocol_ip == 6) {
594 snprintf(values,
sizeof(values),
595 "('%X:%X:%X:%X:%X:%X:%X:%X' ,%d, '%X:%X:%X:%X:%X:%X:%X:%X', %d, %d, to_timestamp('%"
596 PRIu64
".%06" PRIu64
"'), to_timestamp('%" PRIu64
597 ".%06" PRIu64
"'), %" PRIu64
", %" PRIu64
598 ", %d, %d, %d, '%s', '%s', '%s')", source_ipv6[7],
599 source_ipv6[6], source_ipv6[5], source_ipv6[4],
600 source_ipv6[3], source_ipv6[2], source_ipv6[1],
601 source_ipv6[0], source_port, dest_ipv6[7],
602 dest_ipv6[6], dest_ipv6[5], dest_ipv6[4], dest_ipv6[3],
603 dest_ipv6[2], dest_ipv6[1], dest_ipv6[0], dest_port,
604 protocol_ip, time_start / 1000000,
605 time_start % 1000000, time_end / 1000000,
606 time_end % 1000000, packets, bytes, content_type,
607 useragent, method, domain, url, referer);
612 strcat(p->
buffer, values);
615 struct timespec current_time;
616 clock_gettime(CLOCK_REALTIME, ¤t_time);
619 send_insert(p, current_time);
633 int plugin_export_flush(
void *plugin_private)
635 PRINTD(
"plugin flush start\n");
638 struct timespec current_time;
639 clock_gettime(CLOCK_REALTIME, ¤t_time);
640 send_insert(p, current_time);
642 PRINTD(
"plugin flush end\n");