aboutsummaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2023-01-04 18:09:10 +0300
committerGravatar Vitaly Takmazov2023-01-04 18:09:10 +0300
commitcf265cbed048b9324aea4b0bd591dbb34fd6c07b (patch)
tree835eae1b65a2882f2a0a7ebbbc812efe8735a590 /src/main/java
parent703115ac6a1da44bc0759d21c3210f265675caa3 (diff)
ActivityPub: refactor delivery to followers
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/juick/ActivityPubManager.java144
1 files changed, 72 insertions, 72 deletions
diff --git a/src/main/java/com/juick/ActivityPubManager.java b/src/main/java/com/juick/ActivityPubManager.java
index 9dc6a4f0..a487afb1 100644
--- a/src/main/java/com/juick/ActivityPubManager.java
+++ b/src/main/java/com/juick/ActivityPubManager.java
@@ -35,6 +35,7 @@ import com.juick.util.MessageUtils;
import com.juick.util.formatters.PlainTextFormatter;
import com.juick.www.api.SystemActivity.ActivityType;
import com.juick.www.api.activity.helpers.ProfileUriBuilder;
+import com.juick.www.api.activity.model.Activity;
import com.juick.www.api.activity.model.Context;
import com.juick.www.api.activity.model.activities.*;
import com.juick.www.api.activity.model.objects.*;
@@ -172,21 +173,15 @@ public class ActivityPubManager implements ActivityListener, NotificationListene
Message object = event.getMessage();
User user = event.getUser();
Actor me = conversionService.convert(user, Actor.class);
- socialService.getFollowers(user).forEach(acct -> {
- try {
- Actor follower = (Actor) signatureManager.getContext(URI.create(acct)).orElseThrow();
- Update update = new Update();
- var note = makeNote(object);
- update.setId(note.getId() + "#update");
- update.setActor(me.getId());
- update.setObject(note);
- update.setPublished(Instant.now());
- logger.info("Update to follower {}", follower.getId());
- signatureManager.post(me, follower, update);
- } catch (Exception e) {
- logger.warn("{} exception", acct, e);
- }
- });
+ Update update = new Update();
+ var note = makeNote(object);
+ update.setId(note.getId() + "#update");
+ update.setActor(me.getId());
+ update.setObject(note);
+ update.setTo(Collections.singletonList(Context.ACTIVITYSTREAMS_PUBLIC));
+ update.setPublished(Instant.now());
+ logger.info("{} sends note update to followers", me.getId());
+ activityToFollowers(user, me, update);
}
@Override
@@ -199,31 +194,45 @@ public class ActivityPubManager implements ActivityListener, NotificationListene
}
}
+ private void activityToFollowers(User user, Actor from, Activity activity) {
+ socialService.getFollowers(user).forEach(acct -> {
+ var context = signatureManager.getContext(URI.create(acct));
+ context.ifPresentOrElse((follower) -> {
+ if (follower instanceof Actor) {
+ var to = (Actor) follower;
+ try {
+ signatureManager.post(from, to, activity);
+ } catch (Exception e) {
+ logger.warn("Delivery to {} failed: {}", to, e.getMessage());
+ }
+ } else {
+ try {
+ logger.warn("Unknown actor: {}", jsonMapper.writeValueAsString(follower));
+ } catch (JsonProcessingException e) {
+ logger.warn("Invalid JSON: {}", acct);
+ }
+ }
+ }, () -> {
+ logger.warn("Context not verified: {}", acct);
+ });
+ });
+ }
+
@Override
public void processUpdateUserEvent(UpdateUserEvent event) {
User user = event.getUser();
String userUri = profileUriBuilder.personUri(user);
Actor me = conversionService.convert(user, Actor.class);
- socialService.getFollowers(user).forEach(acct -> {
- try {
- var context = signatureManager.getContext(URI.create(acct));
- if (context.isPresent() && context.get() instanceof Actor follower) {
- Update update = new Update();
- update.setId(userUri + "#update");
- update.setActor(me.getId());
- update.setObject(me);
- update.setPublished(Instant.now());
- logger.info("Update to follower {}", follower.getId());
- signatureManager.post(me, follower, update);
- } else {
- logger.warn("Unhandled context: {}", acct);
- }
- } catch (Exception e) {
- logger.warn("activitypub exception", e);
- }
- });
+ Update update = new Update();
+ update.setId(userUri + "#update");
+ update.setActor(me.getId());
+ update.setObject(me);
+ update.setTo(Collections.singletonList(Context.ACTIVITYSTREAMS_PUBLIC));
+ update.setPublished(Instant.now());
+ logger.info("{} sends profile update to followers", me.getId());
+ activityToFollowers(user, me, update);
}
-
+
private void processMessage(Message msg) {
if (MessageUtils.isPM(msg) || msg.isService()) {
return;
@@ -243,20 +252,29 @@ public class ActivityPubManager implements ActivityListener, NotificationListene
subscribers.forEach(acct -> {
if (!acct.equals(profileUriBuilder.followersUri(user))) {
var context = signatureManager.getContext(URI.create(acct));
- if (context.isPresent() && context.get() instanceof Actor follower) {
- Create create = new Create();
- create.setId(note.getId());
- create.setActor(me.getId());
- create.setPublished(note.getPublished());
- create.setObject(note);
- try {
- signatureManager.post(me, follower, create);
- } catch (IOException | NoSuchAlgorithmException e) {
- logger.warn("activitypub exception", e);
+ context.ifPresentOrElse((follower) -> {
+ if (follower instanceof Actor) {
+ var to = (Actor) follower;
+ Create create = new Create();
+ create.setId(note.getId());
+ create.setActor(me.getId());
+ create.setPublished(note.getPublished());
+ create.setObject(note);
+ try {
+ signatureManager.post(me, to, create);
+ } catch (IOException | NoSuchAlgorithmException e) {
+ logger.warn("Delivery to {} failed: {}", to, e.getMessage());
+ }
+ } else {
+ try {
+ logger.warn("Unknown actor: {}", jsonMapper.writeValueAsString(follower));
+ } catch (JsonProcessingException e) {
+ logger.warn("Invalid JSON: {}", acct);
+ }
}
- } else {
- logger.warn("Unhandled context: {}", acct);
- }
+ }, () -> {
+ logger.warn("Context not verified: {}", acct);
+ });
}
});
}
@@ -287,7 +305,8 @@ public class ActivityPubManager implements ActivityListener, NotificationListene
attachment.setMediaType(HttpUtils.mediaType(msg.getAttachmentType()));
note.setAttachment(Collections.singletonList(attachment));
}
- note.setTags(msg.getTags().stream().map(t -> new Hashtag(profileUriBuilder.tagUri(t), t.getName())).collect(Collectors.toList()));
+ note.setTags(msg.getTags().stream().map(t -> new Hashtag(profileUriBuilder.tagUri(t), t.getName()))
+ .collect(Collectors.toList()));
if (msg.getReplyToUri() != null && msg.getReplyToUri().toASCIIString().length() > 0) {
Optional<Context> noteContext = signatureManager.getContext(msg.getReplyToUri());
if (noteContext.isPresent()) {
@@ -346,30 +365,11 @@ public class ActivityPubManager implements ActivityListener, NotificationListene
announce.setActor(profileUriBuilder.personUri(user));
announce.setTo(Collections.singletonList(Context.ACTIVITYSTREAMS_PUBLIC));
announce.setObject(note);
- signatureManager.getContext(URI.create(announce.getActor())).ifPresentOrElse((ctx) -> {
- if (ctx instanceof Actor) {
- socialService.getFollowers(user).forEach(acct -> {
- var follower = signatureManager.getContext(URI.create(acct));
- follower.ifPresentOrElse((person) -> {
- if (person instanceof Actor) {
- try {
- logger.info("{} announcing {} to {}", user.getName(), message.getMid(), acct);
- signatureManager.post((Actor) ctx, (Actor) person, announce);
- } catch (IOException | NoSuchAlgorithmException e) {
- logger.warn("activitypub exception", e);
- }
- } else {
- logger.warn("Unhandled context: {}", acct);
- }
- }, () -> logger.warn("Follower not found: {}", acct));
- });
- } else {
- logger.warn("Unhandled context: {}", announce.getActor());
- }
- }, () -> {
- logger.warn("Context not found: {}", announce.getActor());
- });
+ var me = conversionService.convert(user, Actor.class);
+ logger.info("{} announcing {} to followers", user.getName(), message.getMid());
+ activityToFollowers(user, me, announce);
}
+
public User actorToUser(URI uri) throws HttpBadRequestException, JsonProcessingException {
var context = signatureManager.getContext(uri);
if (context.isPresent() && context.get() instanceof Actor actor) {
@@ -381,7 +381,7 @@ public class ActivityPubManager implements ActivityListener, NotificationListene
}
return user;
} else {
- logger.warn("Unhandled context: {}", jsonMapper.writeValueAsString(context));
+ logger.warn("Unhandled context: {}", uri);
throw new HttpBadRequestException();
}
}