Compare commits

...

7 Commits

Author SHA1 Message Date
tiptorrent development team 161a49b683 open two simultaneous connections using two processes 2022-11-30 12:29:07 +01:00
tiptorrent development team 9e80376cb8 remove file size check
in preparation for multiprocessing.

Each process downloads its own chunks, this check cannot be done unless
there is a way to report each process the amount of bytes receives
(via interprocess communication).

Or simply use threads instead of process.
2022-11-30 12:28:41 +01:00
tiptorrent development team e01a38e4a7 add tip_client_report() 2022-11-30 12:27:46 +01:00
tiptorrent development team 7d70e7abec store filename in struct tip_client 2022-11-30 12:27:45 +01:00
tiptorrent development team 95a06d7111 no need to calculate chunk size in every iteration 2022-11-30 12:26:58 +01:00
Pablo Neira Ayuso d4753d6c3e add tip_client_run() 2022-11-30 12:13:36 +01:00
tiptorrent development team 9d13b48925 remove redundant check
should not ever happen.
2022-11-30 11:44:16 +01:00
1 changed files with 126 additions and 95 deletions

View File

@ -25,6 +25,7 @@
#include <sys/time.h>
#include <limits.h>
#include <syslog.h>
#include <sys/wait.h>
#include <netinet/tcp.h>
#include <fcntl.h>
@ -37,7 +38,6 @@
typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - offsetof(type,member) );})
static const char *filename;
static const char *addr;
struct ev_loop *tip_main_loop;
@ -52,6 +52,7 @@ enum {
struct tip_client {
ev_io io;
struct sockaddr_in addr;
const char *filename;
char buf[10240000];
uint32_t buf_len;
uint64_t data_len;
@ -121,12 +122,12 @@ static void tip_client_progress(struct tip_client *cli, bool now)
printf("%3lu%% (%lu Mbytes/second) %s from %s:9999\n",
cli->content_len > 0 ? 100 * cli->data_len / cli->content_len : 0,
tv.tv_sec > 0 ? cli->data_len / 1024000 / tv.tv_sec : cli->data_len / 1024000,
filename, inet_ntoa(cli->addr.sin_addr));
cli->filename, inet_ntoa(cli->addr.sin_addr));
cli->tv_last = tv_cur;
}
}
static int tip_client_connect(const char *addr);
static int tip_client_connect(struct tip_client *cli, const char *addr);
static int tip_client_get_hdr(struct tip_client *cli)
{
@ -141,7 +142,7 @@ static int tip_client_get_hdr(struct tip_client *cli)
return 0;
if (!strncmp(cli->buf, "HTTP/1.1 404 Not Found", strlen("HTTP/1.1 404 Not Found"))) {
syslog(LOG_ERR, "server says file `%s' not found\n", filename);
syslog(LOG_ERR, "server says file `%s' not found\n", cli->filename);
return -1;
}
if (!strncmp(cli->buf, "HTTP/1.1 301 Moves Permanently", strlen("HTTP/1.1 301 Moves Permanently"))) {
@ -161,11 +162,11 @@ static int tip_client_get_hdr(struct tip_client *cli)
ptr[0] = '\0';
syslog(LOG_INFO, "Redirected to %s to fetch file %s\n",
redirect_addr, filename);
redirect_addr, cli->filename);
cli->redirected = true;
tip_client_close(cli);
tip_client_connect(redirect_addr);
tip_client_connect(cli, redirect_addr);
cli->state = TIP_CLIENT_GET_HEADER;
return 0;
@ -202,7 +203,7 @@ static int tip_client_get_hdr(struct tip_client *cli)
ret = write(cli->fd, payload, payload_len);
if (ret < 0) {
syslog(LOG_ERR, "failed to write to file %s: %s",
filename, strerror(errno));
cli->filename, strerror(errno));
return ret;
}
}
@ -219,7 +220,7 @@ static int tip_client_get_payload(struct tip_client *cli)
ret = write(cli->fd, cli->buf, cli->buf_len);
if (ret < 0) {
syslog(LOG_ERR, "failed to write to file %s: %s",
filename, strerror(errno));
cli->filename, strerror(errno));
return ret;
}
@ -230,7 +231,7 @@ static int tip_client_get_payload(struct tip_client *cli)
if (cli->data_len >= cli->content_len) {
if (cli->redirected) {
tip_client_close(cli);
tip_client_connect(addr);
tip_client_connect(cli, addr);
cli->state = TIP_CLIENT_POST_REDIRECT;
return 1;
@ -262,7 +263,7 @@ static int tip_client_head_hdr(struct tip_client *cli)
return 0;
if (!strncmp(cli->buf, "HTTP/1.1 404 Not Found", strlen("HTTP/1.1 404 Not Found"))) {
syslog(LOG_ERR, "server says file `%s' not found\n", filename);
syslog(LOG_ERR, "server says file `%s' not found\n", cli->filename);
return -1;
}
@ -275,21 +276,21 @@ static int tip_client_head_hdr(struct tip_client *cli)
if (cli->content_len < 0)
return -1;
if (cli->content_len == 0) {
syslog(LOG_ERR, "server reports zero size file %s", filename);
syslog(LOG_ERR, "server reports zero size file %s", cli->filename);
return -1;
}
cli->fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0600);
cli->fd = open(cli->filename, O_WRONLY | O_CREAT | O_TRUNC, 0600);
if (cli->fd < 0) {
syslog(LOG_ERR, "failed to open file %s: %s",
filename, strerror(errno));
cli->filename, strerror(errno));
return -1;
}
if (fallocate(cli->fd, 0, 0, cli->content_len) < 0) {
syslog(LOG_ERR, "failed to allocate room for file %s: %s",
filename, strerror(errno));
delete_file(filename);
cli->filename, strerror(errno));
delete_file(cli->filename);
cli->fatal = true;
return -1;
}
@ -402,7 +403,7 @@ static void tip_client_connect_cb(struct ev_loop *loop, struct ev_io *io, int ev
len = sizeof(cli->addr);
ret = connect(cli->io.fd, (struct sockaddr *)&cli->addr, len);
if (ret < 0) {
syslog(LOG_ERR, "failed to connect to server to fetch %s", filename);
syslog(LOG_ERR, "failed to connect to server to fetch %s", cli->filename);
tip_client_error(cli);
return;
}
@ -410,27 +411,27 @@ static void tip_client_connect_cb(struct ev_loop *loop, struct ev_io *io, int ev
switch (cli->state) {
case TIP_CLIENT_GET_HEADER:
syslog(LOG_INFO, "connected to %s to fetch file %s\n",
inet_ntoa(cli->addr.sin_addr), filename);
inet_ntoa(cli->addr.sin_addr), cli->filename);
if (cli->server_only)
snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\nX-Accept-Redirect: off\r\n\r\n", filename);
snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\nX-Accept-Redirect: off\r\n\r\n", cli->filename);
else
snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\n\r\n", filename);
snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\n\r\n", cli->filename);
break;
case TIP_CLIENT_HEAD_HEADER:
syslog(LOG_INFO, "connected to %s to get file size of %s\n",
inet_ntoa(cli->addr.sin_addr), filename);
snprintf(buf, sizeof(buf), "HEAD /%s HTTP/1.1\r\n\r\n", filename);
inet_ntoa(cli->addr.sin_addr), cli->filename);
snprintf(buf, sizeof(buf), "HEAD /%s HTTP/1.1\r\n\r\n", cli->filename);
break;
case TIP_CLIENT_POST_REDIRECT:
syslog(LOG_INFO, "connected to %s to report redirection for %s\n",
inet_ntoa(cli->addr.sin_addr), filename);
snprintf(buf, sizeof(buf), "POST /%s HTTP/1.1\r\n\r\n", filename);
inet_ntoa(cli->addr.sin_addr), cli->filename);
snprintf(buf, sizeof(buf), "POST /%s HTTP/1.1\r\n\r\n", cli->filename);
break;
}
ret = send(cli->io.fd, buf, strlen(buf), 0);
if (ret < 0) {
syslog(LOG_ERR, "failed to send request for %s", filename);
syslog(LOG_ERR, "failed to send request for %s", cli->filename);
tip_client_error(cli);
return;
}
@ -444,11 +445,10 @@ static void tip_client_connect_cb(struct ev_loop *loop, struct ev_io *io, int ev
#define TIP_TCP_KEEPALIVE_INTL 30
#define TIP_TCP_KEEPALIVE_CNT 4
static int tip_client_connect(const char *addr)
static int tip_client_connect(struct tip_client *cli, const char *addr)
{
int intl = TIP_TCP_KEEPALIVE_INTL, cnt = TIP_TCP_KEEPALIVE_CNT;
int on = 1, idle = TIP_TCP_KEEPALIVE_IDLE;
struct tip_client *cli = &_cli;
int remote_fd;
int flags;
int len;
@ -480,7 +480,8 @@ static int tip_client_connect(const char *addr)
len = sizeof(cli->addr);
ret = connect(remote_fd, (struct sockaddr *)&cli->addr, len);
if (ret < 0 && errno != EINPROGRESS) {
syslog(LOG_ERR, "failed to connect to server to fetch %s", filename);
syslog(LOG_ERR, "failed to connect to server to fetch %s",
cli->filename);
tip_client_error(cli);
return ret;
}
@ -497,7 +498,8 @@ static int tip_client_connect(const char *addr)
static int tip_client_request_file(struct tip_client *cli,
const char *server, const char *filename)
{
tip_client_connect(server);
cli->filename = filename;
tip_client_connect(cli, server);
while (cli->state != TIP_CLIENT_DONE && !cli->error)
ev_loop(tip_main_loop, 0);
@ -555,22 +557,84 @@ static void tip_client_reset_state(struct tip_client *cli, int fd,
cli->num_retries = num_retries;
}
static char _filename[PATH_MAX + 1];
static int tip_client_run(struct tip_client *cli, int fd, const char *addr,
const char *filename, uint32_t chunk,
uint64_t chunk_size)
{
int ret;
do {
tip_client_reset_state(cli, fd, chunk_size * chunk);
syslog(LOG_INFO, "Requesting file %s to server\n", filename);
cli->state = TIP_CLIENT_GET_HEADER;
ret = tip_client_request_file(cli, addr, filename);
} while (ret > 0);
if (ret < 0) {
do {
tip_client_reset_state(cli, fd, chunk_size * chunk);
cli->server_only = true;
syslog(LOG_INFO, "Requesting file %s to server only\n", filename);
cli->state = TIP_CLIENT_GET_HEADER;
ret = tip_client_request_file(cli, addr, filename);
} while (ret > 0);
}
if (ret < 0)
return ret;
if (cli->redirected)
tip_client_stats.redirects++;
else
tip_client_stats.direct_from_server++;
tip_client_progress(cli, true);
return 0;
}
static int tip_client_report(struct tip_client *cli, const struct timeval *tv,
uint64_t data_len, pid_t pid)
{
if (cli->state == TIP_CLIENT_DONE) {
printf("(%u) OK.\n", pid);
syslog(LOG_INFO, "(%u) Done in %lu.%06lu seconds (%lu Mbytes/second). "
"Direct from server: %u Redirected: %u\n",
pid, tv->tv_sec, tv->tv_usec,
tv->tv_sec > 0 ? data_len / 1024000 / tv->tv_sec : data_len / 1024000,
tip_client_stats.direct_from_server,
tip_client_stats.redirects);
return EXIT_SUCCESS;
}
printf("(%u) Failure, see syslog for details.\n", pid);
syslog(LOG_INFO, "(%u) Failure after %lu.%06lu seconds (%lu Mbytes/second). "
"Direct from server: %u Redirected: %u\n",
pid, tv->tv_sec, tv->tv_usec,
tv->tv_sec > 0 ? data_len / 1024000 / tv->tv_sec : data_len / 1024000,
tip_client_stats.direct_from_server,
tip_client_stats.redirects);
return EXIT_FAILURE;
}
#define NUM_CONN 2
int main(int argc, char *argv[])
{
uint32_t chunk_array[NUM_CONN][MAX_CHUNKS / NUM_CONN] = {};
struct timeval tv_start, tv_stop, tv;
uint64_t data_len = 0, file_size = 0;
bool file_chunk[MAX_CHUNKS] = {};
char filename[PATH_MAX + 1];
uint64_t file_size = 0;
int i, j, k, fd, ret;
uint64_t chunk_size;
int i, k, fd, ret;
if (argc != 3) {
printf("%s [ip] [file]\n", argv[0]);
return EXIT_FAILURE;
}
addr = argv[1];
filename = argv[2];
openlog("tiptorrent-client", LOG_PID, LOG_DAEMON);
@ -583,83 +647,50 @@ int main(int argc, char *argv[])
do {
tip_client_reset_state(&_cli, -1, 0);
_cli.state = TIP_CLIENT_HEAD_HEADER;
ret = tip_client_request_file(&_cli, addr, filename);
ret = tip_client_request_file(&_cli, addr, argv[2]);
} while (ret > 0);
if (ret < 0)
goto err_bailout;
if (_cli.state != TIP_CLIENT_DONE)
goto err;
fd = _cli.fd;
file_size = _cli.content_len;
chunk_size = file_size / MAX_CHUNKS;
for (i = 0; i < MAX_CHUNKS; i++) {
k = select_file_chunk(file_chunk);
snprintf(_filename, sizeof(_filename), "%s.%u", argv[2], k);
filename = _filename;
chunk_size = file_size / MAX_CHUNKS;
do {
tip_client_reset_state(&_cli, fd, chunk_size * k);
syslog(LOG_INFO, "Requesting file %s to server\n", filename);
_cli.state = TIP_CLIENT_GET_HEADER;
ret = tip_client_request_file(&_cli, addr, filename);
} while (ret > 0);
if (ret < 0) {
do {
tip_client_reset_state(&_cli, fd, chunk_size * k);
_cli.server_only = true;
syslog(LOG_INFO, "Requesting file %s to server only\n", filename);
_cli.state = TIP_CLIENT_GET_HEADER;
ret = tip_client_request_file(&_cli, addr, filename);
} while (ret > 0);
for (i = 0; i < NUM_CONN; i++) {
for (j = 0; j < MAX_CHUNKS / NUM_CONN; j++) {
k = select_file_chunk(file_chunk);
chunk_array[i][j] = k;
file_chunk[k] = true;
}
if (ret < 0)
goto err_bailout;
if (_cli.redirected)
tip_client_stats.redirects++;
else
tip_client_stats.direct_from_server++;
tip_client_progress(&_cli, true);
file_chunk[k] = true;
data_len += _cli.data_len;
}
close(fd);
err:
gettimeofday(&tv_stop, NULL);
timersub(&tv_stop, &tv_start, &tv);
if (data_len != file_size) {
syslog(LOG_ERR, "Failure, file size is %lu bytes but received %lu bytes!\n",
file_size, data_len);
return EXIT_FAILURE;
}
if (_cli.state == TIP_CLIENT_DONE) {
printf("OK.\n");
syslog(LOG_INFO, "Done in %lu.%06lu seconds (%lu Mbytes/second). "
"Direct from server: %u Redirected: %u\n",
tv.tv_sec, tv.tv_usec,
tv.tv_sec > 0 ? data_len / 1024000 / tv.tv_sec : data_len / 1024000,
tip_client_stats.direct_from_server,
tip_client_stats.redirects);
for (i = 0; i < NUM_CONN; i++) {
ret = fork();
if (ret > 0) {
for (j = 0; j < MAX_CHUNKS / NUM_CONN; j++) {
k = chunk_array[i][j];
snprintf(filename, sizeof(filename), "%s.%u", argv[2], k);
if (tip_client_run(&_cli, fd, addr, filename, k, chunk_size) < 0)
goto err_bailout;
}
break;
} else if (ret < 0) {
syslog(LOG_ERR, "failed to fork process");
break;
}
}
if (ret == 0) {
while (wait(NULL) > 0);
return EXIT_SUCCESS;
}
printf("Failure, see syslog for details.\n");
syslog(LOG_INFO, "Failure after %lu.%06lu seconds (%lu Mbytes/second). "
"Direct from server: %u Redirected: %u\n",
tv.tv_sec, tv.tv_usec,
tv.tv_sec > 0 ? data_len / 1024000 / tv.tv_sec : data_len / 1024000,
tip_client_stats.direct_from_server,
tip_client_stats.redirects);
return EXIT_FAILURE;
close(fd);
gettimeofday(&tv_stop, NULL);
timersub(&tv_stop, &tv_start, &tv);
return tip_client_report(&_cli, &tv, file_size / 2, ret);
err_bailout:
if (_cli.fatal)